[ https://issues.apache.org/jira/browse/SPARK-35914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Helt Long updated SPARK-35914: ------------------------------ Attachment: stuck log.png > Driver can't distribute task to executor because NullPointerException > --------------------------------------------------------------------- > > Key: SPARK-35914 > URL: https://issues.apache.org/jira/browse/SPARK-35914 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.1, 3.1.1, 3.1.2 > Environment: CDH 5.7.1: Hadoop 2.6.5 > Spark 3.0.1, 3.1.1, 3.1.2 > Reporter: Helt Long > Priority: Major > Attachments: stuck log.png, webui stuck.png > > > When use spark3 submit a spark job to yarn cluster, I get a problem. Once in > a while, driver can't distribute any tasks to any executors, and the stage > will stuck , the total spark job will stuck. Check driver log, I found > NullPointerException. It's like a netty problem, I can confirm this problem > only exist in spark3, because I use spark2 never happend. > > {code:java} > // Error message > 21/06/28 14:42:43 INFO TaskSetManager: Starting task 2592.0 in stage 1.0 (TID > 3494) (worker39.hadoop, executor 84, partition 2592, RACK_LOCAL, 5006 bytes) > taskResourceAssignments Map() > 21/06/28 14:42:43 INFO TaskSetManager: Finished task 4155.0 in stage 1.0 (TID > 3367) in 36670 ms on worker39.hadoop (executor 84) (3278/4249) > 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2283.0 in stage 1.0 (TID > 3422) in 22371 ms on worker15.hadoop (executor 109) (3279/4249) > 21/06/28 14:42:43 ERROR Inbox: Ignoring error > java.lang.NullPointerException > at java.lang.String.length(String.java:623) > at > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483) > at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) > at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) > at > org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54) > at > org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484) > at scala.Option.map(Option.scala:230) > at > org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157) > at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) > at > org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2255.0 in stage 1.0 (TID > 3419) in 23035 ms on worker15.hadoop (executor 116) (3280/4249) > 21/06/28 14:42:43 ERROR Inbox: Ignoring error > java.lang.NullPointerException > at java.lang.String.length(String.java:623) > at > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483) > at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) > at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) > at > org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54) > at > org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484) > at scala.Option.map(Option.scala:230) > at > org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576) > at > org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332) > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157) > at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) > at > org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > ..... > 21/06/28 14:43:44 INFO TaskSetManager: Finished task 155.0 in stage 1.0 (TID > 3486) in 64503 ms on worker02.hadoop (executor 59) (3427/4249) > 21/06/28 14:43:44 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:45 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:46 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:47 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:48 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:49 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:49 INFO TaskSetManager: Finished task 189.0 in stage 1.0 (TID > 3491) in 66320 ms on worker02.hadoop (executor 62) (3428/4249) > 21/06/28 14:43:49 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:50 ERROR Inbox: Ignoring error > java.lang.NullPointerException > 21/06/28 14:43:51 ERROR Inbox: Ignoring error > java.lang.NullPointerException{code} > > > The first time I found this problem in spark3.0.1, I checked spark source > code, the NPE happended in > *org.apache.spark.scheduler.TaskSetManager.logInfo*, source code like below: > {code:java} > logInfo(s"Starting $taskName (TID $taskId, $host, executor > ${info.executorId}, " + > s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} > bytes)") > {code} > I tried to confirm which variable is null, so I add log and repack spark-core > like below: > {code:java} > // add log: > logInfo(s"zyh Starting $taskName") > logInfo(s"zyh (TID $taskId") > logInfo(s"zyh $host") > logInfo(s"zyh executor ${info.executorId}") > logInfo(s"zyh ${task.partitionId}") > logInfo(s"zyh $taskLocality,") > logInfo(s"zyh ${serializedTask.limit()} bytes)") > // repack command > ./build/mvn -DskipTests -pl :spark-core_2.12 clean install{code} > Then I got *host* is null, try print: > {code:java} > println(host) // result is Some(null) > println(host.getClass.getName) // result is scala.Some > {code} > track host, add log everywhere to print it: > {{TaskSchedulerImpl.resourceOfferSingleTaskSet}} -> > {{TaskSetManager.resourceOffer}}-> {{TaskSetManager.dequeueTask}} -> > {{TaskSetManager.logInfo}} > Strange host is nomal ! But executorId become null... This two variable came > from one case class *WorkerOffer* > So I add log to print executorId everywhere, the problem disappered !!! > {code:java} > // TaskSchedulerImpl.resourceOfferSingleTaskSet > private def resourceOfferSingleTaskSet( > taskSet: TaskSetManager, > maxLocality: TaskLocality, > shuffledOffers: Seq[WorkerOffer], > availableCpus: Array[Int], > availableResources: Array[Map[String, Buffer[String]]], > tasks: IndexedSeq[ArrayBuffer[TaskDescription]], > addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = > { > var launchedTask = false // nodes and executors that are blacklisted > for the entire application have already been > // filtered out by this point > for (i <- 0 until shuffledOffers.size) { > val execId = shuffledOffers(i).executorId > val host = shuffledOffers(i).host > if (availableCpus(i) >= CPUS_PER_TASK && > resourcesMeetTaskRequirements(availableResources(i))) { > try { > val time = System.currentTimeMillis() > // the first print > // scalastyle:off println > println(s"==zyh1== $time" + host.getClass.getName) > println(s"==zyh1== $time" + execId.getClass.getName) > for (task <- taskSet.resourceOffer(execId, host, maxLocality, > availableResources(i))) {// TaskSetManager.resourceOffer > def resourceOffer( > execId: String, > host: String, > maxLocality: TaskLocality.TaskLocality, > availableResources: Map[String, Seq[String]] = Map.empty) > : Option[TaskDescription] = > { > val time = System.currentTimeMillis() > // the second print > // scalastyle:off println > println(s"==zyh2== $time" + host.getClass.getName) > println(s"==zyh2== $time" + execId.getClass.getName) > > val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => > blacklist.isNodeBlacklistedForTaskSet(host) || > blacklist.isExecutorBlacklistedForTaskSet(execId) > } > if (!isZombie && !offerBlacklisted) { > val curTime = clock.getTimeMillis() var allowedLocality = > maxLocality if (maxLocality != TaskLocality.NO_PREF) { > allowedLocality = getAllowedLocalityLevel(curTime) > if (allowedLocality > maxLocality) { > // We're not allowed to search for farther-away tasks > allowedLocality = maxLocality > } > } dequeueTask(execId, host, allowedLocality).map { case ((index, > taskLocality, speculative)) => > > // TaskSetManager.dequeueTask > private def dequeueTask( > execId: String, > host: String, > maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, > Boolean)] = { > // Tries to schedule a regular task first; if it returns None, then > schedules > // a speculative task > val time = System.currentTimeMillis() > // the third print > // scalastyle:off println > println(s"==zyh3== $time" + host.getClass.getName) > println(s"==zyh3== $time" + execId.getClass.getName) > dequeueTaskHelper(execId, host, maxLocality, false).orElse( > dequeueTaskHelper(execId, host, maxLocality, true)) > } > > // TaskSetManager.resourceOffer.dequeueTask,上面返回后进入map时再次打印 > dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, > speculative)) => > // Found a task; do some bookkeeping and return a task description > val time = System.currentTimeMillis() > // // the fourth print > // scalastyle:off println > println(s"==zyh4== $time" + host.getClass.getName) > println(s"==zyh4== $time" + execId.getClass.getName) > > // TaskSetManager.logInfo > // try catch NPE > val taskName = s"task ${info.id} in stage ${taskSet.id}" > try { > logInfo(s"Starting $taskName (TID $taskId, $host, executor > ${info.executorId}, " + > s"partition ${task.partitionId}, $taskLocality, > ${serializedTask.limit()} bytes)") > } catch { > case e: NullPointerException => > val time = System.currentTimeMillis() > e.printStackTrace() > // scalastyle:off println > println("+++zyh+++" + host.getClass.getName + " " + > taskName.getClass.getName) > println("+++zyh+++" + taskId.getClass.getName + " " + > info.executorId.getClass.getName) > println("+++zyh+++" + task.partitionId.getClass.getName) > println("+++zyh+++" + taskLocality.getClass.getName) > println("+++zyh+++" + serializedTask.limit().getClass.getName) > } > {code} > This problem make it's difficult to trust a spark job is running or not, only > when I check spark web ui and found it's not any running task, I can kill it > by hand. I test on spark3.0.1 3.1.1 3.1.2, it's same problem. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org