[
https://issues.apache.org/jira/browse/SPARK-35914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371009#comment-17371009
]
Helt Long commented on SPARK-35914:
-----------------------------------
I guess this problem is related to hadoop-version, I use
CDH-5.7.1:hadoop-2.6.5, spark3 use hadoop-2.7. Because the other problem I
found in spark about webui, it's caused by the version. I will try higher
hadoop-version to confirm it.
[SPARK-35802] Error loading the stages/stage/<id> page in spark UI - ASF JIRA
(apache.org)
> Driver can't distribute task to executor because NullPointerException
> ---------------------------------------------------------------------
>
> Key: SPARK-35914
> URL: https://issues.apache.org/jira/browse/SPARK-35914
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.0.1, 3.1.1, 3.1.2
> Environment: CDH 5.7.1: Hadoop 2.6.5
> Spark 3.0.1, 3.1.1, 3.1.2
> Reporter: Helt Long
> Priority: Major
> Attachments: stuck log.png, webui stuck.png
>
>
> When use spark3 submit a spark job to yarn cluster, I get a problem. Once in
> a while, driver can't distribute any tasks to any executors, and the stage
> will stuck , the total spark job will stuck. Check driver log, I found
> NullPointerException. It's like a netty problem, I can confirm this problem
> only exist in spark3, because I use spark2 never happend.
>
> {code:java}
> // Error message
> 21/06/28 14:42:43 INFO TaskSetManager: Starting task 2592.0 in stage 1.0 (TID
> 3494) (worker39.hadoop, executor 84, partition 2592, RACK_LOCAL, 5006 bytes)
> taskResourceAssignments Map()
> 21/06/28 14:42:43 INFO TaskSetManager: Finished task 4155.0 in stage 1.0 (TID
> 3367) in 36670 ms on worker39.hadoop (executor 84) (3278/4249)
> 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2283.0 in stage 1.0 (TID
> 3422) in 22371 ms on worker15.hadoop (executor 109) (3279/4249)
> 21/06/28 14:42:43 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> at java.lang.String.length(String.java:623)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
> at java.lang.StringBuilder.append(StringBuilder.java:136)
> at
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
> at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
> at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
> at
> org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
> at
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
> at scala.Option.map(Option.scala:230)
> at
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
> at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> at
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
> at
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 21/06/28 14:42:43 INFO TaskSetManager: Finished task 2255.0 in stage 1.0 (TID
> 3419) in 23035 ms on worker15.hadoop (executor 116) (3280/4249)
> 21/06/28 14:42:43 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> at java.lang.String.length(String.java:623)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
> at java.lang.StringBuilder.append(StringBuilder.java:136)
> at
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
> at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
> at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
> at
> org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
> at
> org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
> at scala.Option.map(Option.scala:230)
> at
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
> at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> at
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
> at
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> .....
> 21/06/28 14:43:44 INFO TaskSetManager: Finished task 155.0 in stage 1.0 (TID
> 3486) in 64503 ms on worker02.hadoop (executor 59) (3427/4249)
> 21/06/28 14:43:44 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:45 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:46 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:47 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:48 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:49 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:49 INFO TaskSetManager: Finished task 189.0 in stage 1.0 (TID
> 3491) in 66320 ms on worker02.hadoop (executor 62) (3428/4249)
> 21/06/28 14:43:49 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:50 ERROR Inbox: Ignoring error
> java.lang.NullPointerException
> 21/06/28 14:43:51 ERROR Inbox: Ignoring error
> java.lang.NullPointerException{code}
>
>
> The first time I found this problem in spark3.0.1, I checked spark source
> code, the NPE happended in
> *org.apache.spark.scheduler.TaskSetManager.logInfo*, source code like below:
> {code:java}
> logInfo(s"Starting $taskName (TID $taskId, $host, executor
> ${info.executorId}, " +
> s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()}
> bytes)")
> {code}
> I tried to confirm which variable is null, so I add log and repack spark-core
> like below:
> {code:java}
> // add log:
> logInfo(s"zyh Starting $taskName")
> logInfo(s"zyh (TID $taskId")
> logInfo(s"zyh $host")
> logInfo(s"zyh executor ${info.executorId}")
> logInfo(s"zyh ${task.partitionId}")
> logInfo(s"zyh $taskLocality,")
> logInfo(s"zyh ${serializedTask.limit()} bytes)")
> // repack command
> ./build/mvn -DskipTests -pl :spark-core_2.12 clean install{code}
> Then I got *host* is null, try print:
> {code:java}
> println(host) // result is Some(null)
> println(host.getClass.getName) // result is scala.Some
> {code}
> track host, add log everywhere to print it:
> {{TaskSchedulerImpl.resourceOfferSingleTaskSet}} ->
> {{TaskSetManager.resourceOffer}}-> {{TaskSetManager.dequeueTask}} ->
> {{TaskSetManager.logInfo}}
> Strange host is nomal ! But executorId become null... This two variable came
> from one case class *WorkerOffer*
> So I add log to print executorId everywhere, the problem disappered !!!
> {code:java}
> // TaskSchedulerImpl.resourceOfferSingleTaskSet
> private def resourceOfferSingleTaskSet(
> taskSet: TaskSetManager,
> maxLocality: TaskLocality,
> shuffledOffers: Seq[WorkerOffer],
> availableCpus: Array[Int],
> availableResources: Array[Map[String, Buffer[String]]],
> tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
> addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean =
> {
> var launchedTask = false // nodes and executors that are blacklisted
> for the entire application have already been
> // filtered out by this point
> for (i <- 0 until shuffledOffers.size) {
> val execId = shuffledOffers(i).executorId
> val host = shuffledOffers(i).host
> if (availableCpus(i) >= CPUS_PER_TASK &&
> resourcesMeetTaskRequirements(availableResources(i))) {
> try {
> val time = System.currentTimeMillis()
> // the first print
> // scalastyle:off println
> println(s"==zyh1== $time" + host.getClass.getName)
> println(s"==zyh1== $time" + execId.getClass.getName)
> for (task <- taskSet.resourceOffer(execId, host, maxLocality,
> availableResources(i))) {// TaskSetManager.resourceOffer
> def resourceOffer(
> execId: String,
> host: String,
> maxLocality: TaskLocality.TaskLocality,
> availableResources: Map[String, Seq[String]] = Map.empty)
> : Option[TaskDescription] =
> {
> val time = System.currentTimeMillis()
> // the second print
> // scalastyle:off println
> println(s"==zyh2== $time" + host.getClass.getName)
> println(s"==zyh2== $time" + execId.getClass.getName)
>
> val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
> blacklist.isNodeBlacklistedForTaskSet(host) ||
> blacklist.isExecutorBlacklistedForTaskSet(execId)
> }
> if (!isZombie && !offerBlacklisted) {
> val curTime = clock.getTimeMillis() var allowedLocality =
> maxLocality if (maxLocality != TaskLocality.NO_PREF) {
> allowedLocality = getAllowedLocalityLevel(curTime)
> if (allowedLocality > maxLocality) {
> // We're not allowed to search for farther-away tasks
> allowedLocality = maxLocality
> }
> } dequeueTask(execId, host, allowedLocality).map { case ((index,
> taskLocality, speculative)) =>
>
> // TaskSetManager.dequeueTask
> private def dequeueTask(
> execId: String,
> host: String,
> maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value,
> Boolean)] = {
> // Tries to schedule a regular task first; if it returns None, then
> schedules
> // a speculative task
> val time = System.currentTimeMillis()
> // the third print
> // scalastyle:off println
> println(s"==zyh3== $time" + host.getClass.getName)
> println(s"==zyh3== $time" + execId.getClass.getName)
> dequeueTaskHelper(execId, host, maxLocality, false).orElse(
> dequeueTaskHelper(execId, host, maxLocality, true))
> }
>
> // TaskSetManager.resourceOffer.dequeueTask,上面返回后进入map时再次打印
> dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality,
> speculative)) =>
> // Found a task; do some bookkeeping and return a task description
> val time = System.currentTimeMillis()
> // // the fourth print
> // scalastyle:off println
> println(s"==zyh4== $time" + host.getClass.getName)
> println(s"==zyh4== $time" + execId.getClass.getName)
>
> // TaskSetManager.logInfo
> // try catch NPE
> val taskName = s"task ${info.id} in stage ${taskSet.id}"
> try {
> logInfo(s"Starting $taskName (TID $taskId, $host, executor
> ${info.executorId}, " +
> s"partition ${task.partitionId}, $taskLocality,
> ${serializedTask.limit()} bytes)")
> } catch {
> case e: NullPointerException =>
> val time = System.currentTimeMillis()
> e.printStackTrace()
> // scalastyle:off println
> println("+++zyh+++" + host.getClass.getName + " " +
> taskName.getClass.getName)
> println("+++zyh+++" + taskId.getClass.getName + " " +
> info.executorId.getClass.getName)
> println("+++zyh+++" + task.partitionId.getClass.getName)
> println("+++zyh+++" + taskLocality.getClass.getName)
> println("+++zyh+++" + serializedTask.limit().getClass.getName)
> }
> {code}
> This problem make it's difficult to trust a spark job is running or not, only
> when I check spark web ui and found it's not any running task, I can kill it
> by hand. I test on spark3.0.1 3.1.1 3.1.2, it's same problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]