Helt Long created SPARK-35914: --------------------------------- Summary: Driver can't distribute task to executor because NullPointerException Key: SPARK-35914 URL: https://issues.apache.org/jira/browse/SPARK-35914 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.2, 3.1.1, 3.0.1 Environment: CDH 5.7.1: Hadoop 2.6.5
Spark 3.0.1, 3.1.1, 3.1.2 Reporter: Helt Long When use spark3 submit a spark job to yarn cluster, I get a problem. Once in a while, driver can't distribute any tasks to any executors, and the stage will stuck , the total spark job will stuck. Check driver log, I found NullPointerException. It's like a netty problem, I can confirm this problem only exist in spark3, because I use spark2 never happend. {code:java} // Error message 21/06/28 14:42:43 INFO TaskSetManager: Starting task 2592.0 in stage 1.0 (TID 3494) (worker39.hadoop, executor 84, partition 2592, RACK_LOCAL, 5006 bytes) taskResourceAssignments Map() 21/06/28 14:42:43 INFO TaskSetManager: Finished task 4155.0 in stage 1.0 (TID 3367) in 36670 ms on worker39.hadoop (executor 84) (3278/4249) 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2283.0 in stage 1.0 (TID 3422) in 22371 ms on worker15.hadoop (executor 109) (3279/4249) 21/06/28 14:42:43 ERROR Inbox: Ignoring error java.lang.NullPointerException at java.lang.String.length(String.java:623) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420) at java.lang.StringBuilder.append(StringBuilder.java:136) at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483) at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) at org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54) at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484) at scala.Option.map(Option.scala:230) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2255.0 in stage 1.0 (TID 3419) in 23035 ms on worker15.hadoop (executor 116) (3280/4249) 21/06/28 14:42:43 ERROR Inbox: Ignoring error java.lang.NullPointerException at java.lang.String.length(String.java:623) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420) at java.lang.StringBuilder.append(StringBuilder.java:136) at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483) at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) at org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54) at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484) at scala.Option.map(Option.scala:230) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576) at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ..... 21/06/28 14:43:44 INFO TaskSetManager: Finished task 155.0 in stage 1.0 (TID 3486) in 64503 ms on worker02.hadoop (executor 59) (3427/4249) 21/06/28 14:43:44 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:45 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:46 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:47 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:48 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:49 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:49 INFO TaskSetManager: Finished task 189.0 in stage 1.0 (TID 3491) in 66320 ms on worker02.hadoop (executor 62) (3428/4249) 21/06/28 14:43:49 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:50 ERROR Inbox: Ignoring error java.lang.NullPointerException 21/06/28 14:43:51 ERROR Inbox: Ignoring error java.lang.NullPointerException{code} The first time I found this problem in spark3.0.1, I checked spark source code, the NPE happended in *org.apache.spark.scheduler.TaskSetManager.logInfo*, source code like below: {code:java} logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)") {code} I tried to confirm which variable is null, so I add log and repack spark-core like below: {code:java} // add log: logInfo(s"zyh Starting $taskName") logInfo(s"zyh (TID $taskId") logInfo(s"zyh $host") logInfo(s"zyh executor ${info.executorId}") logInfo(s"zyh ${task.partitionId}") logInfo(s"zyh $taskLocality,") logInfo(s"zyh ${serializedTask.limit()} bytes)") // repack command ./build/mvn -DskipTests -pl :spark-core_2.12 clean install{code} Then I got *host* is null, try print: {code:java} println(host) // result is Some(null) println(host.getClass.getName) // result is scala.Some {code} track host, add log everywhere to print it: {{TaskSchedulerImpl.resourceOfferSingleTaskSet}} -> {{TaskSetManager.resourceOffer}}-> {{TaskSetManager.dequeueTask}} -> {{TaskSetManager.logInfo}} Strange host is nomal ! But executorId become null... This two variable came from one case class *WorkerOffer* So I add log to print executorId everywhere, the problem disappered !!! {code:java} // TaskSchedulerImpl.resourceOfferSingleTaskSet private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], availableResources: Array[Map[String, Buffer[String]]], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK && resourcesMeetTaskRequirements(availableResources(i))) { try { val time = System.currentTimeMillis() // the first print // scalastyle:off println println(s"==zyh1== $time" + host.getClass.getName) println(s"==zyh1== $time" + execId.getClass.getName) for (task <- taskSet.resourceOffer(execId, host, maxLocality, availableResources(i))) {// TaskSetManager.resourceOffer def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality, availableResources: Map[String, Seq[String]] = Map.empty) : Option[TaskDescription] = { val time = System.currentTimeMillis() // the second print // scalastyle:off println println(s"==zyh2== $time" + host.getClass.getName) println(s"==zyh2== $time" + execId.getClass.getName) val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isExecutorBlacklistedForTaskSet(execId) } if (!isZombie && !offerBlacklisted) { val curTime = clock.getTimeMillis() var allowedLocality = maxLocality if (maxLocality != TaskLocality.NO_PREF) { allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { // We're not allowed to search for farther-away tasks allowedLocality = maxLocality } } dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => // TaskSetManager.dequeueTask private def dequeueTask( execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = { // Tries to schedule a regular task first; if it returns None, then schedules // a speculative task val time = System.currentTimeMillis() // the third print // scalastyle:off println println(s"==zyh3== $time" + host.getClass.getName) println(s"==zyh3== $time" + execId.getClass.getName) dequeueTaskHelper(execId, host, maxLocality, false).orElse( dequeueTaskHelper(execId, host, maxLocality, true)) } // TaskSetManager.resourceOffer.dequeueTask,上面返回后进入map时再次打印 dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => // Found a task; do some bookkeeping and return a task description val time = System.currentTimeMillis() // // the fourth print // scalastyle:off println println(s"==zyh4== $time" + host.getClass.getName) println(s"==zyh4== $time" + execId.getClass.getName) // TaskSetManager.logInfo // try catch NPE val taskName = s"task ${info.id} in stage ${taskSet.id}" try { logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)") } catch { case e: NullPointerException => val time = System.currentTimeMillis() e.printStackTrace() // scalastyle:off println println("+++zyh+++" + host.getClass.getName + " " + taskName.getClass.getName) println("+++zyh+++" + taskId.getClass.getName + " " + info.executorId.getClass.getName) println("+++zyh+++" + task.partitionId.getClass.getName) println("+++zyh+++" + taskLocality.getClass.getName) println("+++zyh+++" + serializedTask.limit().getClass.getName) } {code} This problem make it's difficult to trust a spark job is running or not, only when I check spark web ui and found it's not any running task, I can kill it by hand. I test on spark3.0.1 3.1.1 3.1.2, it's same problem. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org