[ 
https://issues.apache.org/jira/browse/SPARK-35914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Helt Long updated SPARK-35914:
------------------------------
    Attachment: stuck log.png

> Driver can't distribute task to executor because NullPointerException
> ---------------------------------------------------------------------
>
>                 Key: SPARK-35914
>                 URL: https://issues.apache.org/jira/browse/SPARK-35914
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.1, 3.1.1, 3.1.2
>         Environment: CDH 5.7.1: Hadoop 2.6.5
> Spark 3.0.1, 3.1.1, 3.1.2
>            Reporter: Helt Long
>            Priority: Major
>         Attachments: stuck log.png, webui stuck.png
>
>
> When use spark3 submit a spark job to yarn cluster, I get a problem. Once in 
> a while, driver can't distribute any tasks to any executors, and the stage 
> will stuck , the total spark job will stuck. Check driver log, I found 
> NullPointerException. It's like a netty problem, I can confirm this problem 
> only exist in spark3, because I use spark2 never happend.
>  
> {code:java}
> // Error message
> 21/06/28 14:42:43 INFO TaskSetManager: Starting task 2592.0 in stage 1.0 (TID 
> 3494) (worker39.hadoop, executor 84, partition 2592, RACK_LOCAL, 5006 bytes) 
> taskResourceAssignments Map()
> 21/06/28 14:42:43 INFO TaskSetManager: Finished task 4155.0 in stage 1.0 (TID 
> 3367) in 36670 ms on worker39.hadoop (executor 84) (3278/4249)
> 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2283.0 in stage 1.0 (TID 
> 3422) in 22371 ms on worker15.hadoop (executor 109) (3279/4249)
> 21/06/28 14:42:43 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
>       at java.lang.String.length(String.java:623)
>       at 
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
>       at java.lang.StringBuilder.append(StringBuilder.java:136)
>       at 
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
>       at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
>       at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
>       at 
> org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
>       at 
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
>       at scala.Option.map(Option.scala:230)
>       at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
>       at scala.Option.foreach(Option.scala:407)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
>       at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>       at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
>       at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
>       at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
>       at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>       at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>       at 
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2255.0 in stage 1.0 (TID 
> 3419) in 23035 ms on worker15.hadoop (executor 116) (3280/4249)
> 21/06/28 14:42:43 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
>       at java.lang.String.length(String.java:623)
>       at 
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
>       at java.lang.StringBuilder.append(StringBuilder.java:136)
>       at 
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
>       at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
>       at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
>       at 
> org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
>       at 
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
>       at scala.Option.map(Option.scala:230)
>       at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
>       at scala.Option.foreach(Option.scala:407)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
>       at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>       at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
>       at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
>       at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
>       at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>       at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>       at 
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> .....
> 21/06/28 14:43:44 INFO TaskSetManager: Finished task 155.0 in stage 1.0 (TID 
> 3486) in 64503 ms on worker02.hadoop (executor 59) (3427/4249)
> 21/06/28 14:43:44 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:45 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:46 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:47 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:48 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:49 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:49 INFO TaskSetManager: Finished task 189.0 in stage 1.0 (TID 
> 3491) in 66320 ms on worker02.hadoop (executor 62) (3428/4249)
> 21/06/28 14:43:49 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:50 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:51 ERROR Inbox: Ignoring error
> java.lang.NullPointerException{code}
>  
>  
> The first time I found this problem in spark3.0.1, I checked spark source 
> code, the NPE happended in  
> *org.apache.spark.scheduler.TaskSetManager.logInfo*, source code like below:
> {code:java}
> logInfo(s"Starting $taskName (TID $taskId, $host, executor 
> ${info.executorId}, " +
>     s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} 
> bytes)")
> {code}
> I tried to confirm which variable is null, so I add log and repack spark-core 
> like below:
> {code:java}
> // add log:
> logInfo(s"zyh Starting $taskName")
> logInfo(s"zyh (TID $taskId")
> logInfo(s"zyh $host")
> logInfo(s"zyh executor ${info.executorId}")
> logInfo(s"zyh ${task.partitionId}")
> logInfo(s"zyh $taskLocality,")
> logInfo(s"zyh ${serializedTask.limit()} bytes)")
> // repack command
> ./build/mvn -DskipTests -pl :spark-core_2.12 clean install{code}
> Then I got *host* is null, try print:
> {code:java}
> println(host) // result is Some(null)
> println(host.getClass.getName) // result is scala.Some
> {code}
> track host, add log everywhere to print it:
> {{TaskSchedulerImpl.resourceOfferSingleTaskSet}} -> 
> {{TaskSetManager.resourceOffer}}-> {{TaskSetManager.dequeueTask}} -> 
> {{TaskSetManager.logInfo}}
> Strange host is nomal ! But executorId become null... This two variable came 
> from one case class  *WorkerOffer*
> So I add log to print executorId everywhere, the problem disappered !!!
> {code:java}
> // TaskSchedulerImpl.resourceOfferSingleTaskSet
>  private def resourceOfferSingleTaskSet(
>       taskSet: TaskSetManager,
>       maxLocality: TaskLocality,
>       shuffledOffers: Seq[WorkerOffer],
>       availableCpus: Array[Int],
>       availableResources: Array[Map[String, Buffer[String]]],
>       tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
>       addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = 
> {
>     var launchedTask = false    // nodes and executors that are blacklisted 
> for the entire application have already been
>     // filtered out by this point
>     for (i <- 0 until shuffledOffers.size) {
>       val execId = shuffledOffers(i).executorId
>       val host = shuffledOffers(i).host
>       if (availableCpus(i) >= CPUS_PER_TASK &&
>         resourcesMeetTaskRequirements(availableResources(i))) {
>         try {
>           val time = System.currentTimeMillis()
>           // the first print
>           // scalastyle:off println
>           println(s"==zyh1== $time" + host.getClass.getName)
>           println(s"==zyh1== $time" + execId.getClass.getName)
>           for (task <- taskSet.resourceOffer(execId, host, maxLocality, 
> availableResources(i))) {// TaskSetManager.resourceOffer
>   def resourceOffer(
>       execId: String,
>       host: String,
>       maxLocality: TaskLocality.TaskLocality,
>       availableResources: Map[String, Seq[String]] = Map.empty)
>     : Option[TaskDescription] =
>   {
>     val time = System.currentTimeMillis()
>     // the second print
>     // scalastyle:off println
>     println(s"==zyh2== $time" + host.getClass.getName)
>     println(s"==zyh2== $time" + execId.getClass.getName)
>       
>     val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
>       blacklist.isNodeBlacklistedForTaskSet(host) ||
>         blacklist.isExecutorBlacklistedForTaskSet(execId)
>     }
>     if (!isZombie && !offerBlacklisted) {
>       val curTime = clock.getTimeMillis()      var allowedLocality = 
> maxLocality      if (maxLocality != TaskLocality.NO_PREF) {
>         allowedLocality = getAllowedLocalityLevel(curTime)
>         if (allowedLocality > maxLocality) {
>           // We're not allowed to search for farther-away tasks
>           allowedLocality = maxLocality
>         }
>       }      dequeueTask(execId, host, allowedLocality).map { case ((index, 
> taskLocality, speculative)) =>
>           
> // TaskSetManager.dequeueTask
> private def dequeueTask(
>       execId: String,
>       host: String,
>       maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>     // Tries to schedule a regular task first; if it returns None, then 
> schedules
>     // a speculative task
>     val time = System.currentTimeMillis()
>     // the third print
>     // scalastyle:off println
>     println(s"==zyh3== $time" + host.getClass.getName)
>     println(s"==zyh3== $time" + execId.getClass.getName)
>     dequeueTaskHelper(execId, host, maxLocality, false).orElse(
>       dequeueTaskHelper(execId, host, maxLocality, true))
>   }
>           
> // TaskSetManager.resourceOffer.dequeueTask,上面返回后进入map时再次打印
> dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, 
> speculative)) =>
>         // Found a task; do some bookkeeping and return a task description
>         val time = System.currentTimeMillis()
>         // // the fourth print
>         // scalastyle:off println
>         println(s"==zyh4== $time" + host.getClass.getName)
>         println(s"==zyh4== $time" + execId.getClass.getName)
>     
> // TaskSetManager.logInfo
>     // try catch NPE
>         val taskName = s"task ${info.id} in stage ${taskSet.id}"
>         try {
>           logInfo(s"Starting $taskName (TID $taskId, $host, executor 
> ${info.executorId}, " +
>             s"partition ${task.partitionId}, $taskLocality, 
> ${serializedTask.limit()} bytes)")
>         } catch {
>           case e: NullPointerException =>
>             val time = System.currentTimeMillis()
>             e.printStackTrace()
>             // scalastyle:off println
>             println("+++zyh+++" + host.getClass.getName + "  " + 
> taskName.getClass.getName)
>             println("+++zyh+++" + taskId.getClass.getName + "  " + 
> info.executorId.getClass.getName)
>             println("+++zyh+++" + task.partitionId.getClass.getName)
>             println("+++zyh+++" + taskLocality.getClass.getName)
>             println("+++zyh+++" + serializedTask.limit().getClass.getName)
>         }
> {code}
> This problem make it's difficult to trust a spark job is running or not, only 
> when I check spark web ui and found it's not any running task, I can kill it 
> by hand. I test on spark3.0.1 3.1.1 3.1.2, it's same problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to