Helt Long created SPARK-35914:
---------------------------------

             Summary: Driver can't distribute task to executor because 
NullPointerException
                 Key: SPARK-35914
                 URL: https://issues.apache.org/jira/browse/SPARK-35914
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.1.2, 3.1.1, 3.0.1
         Environment: CDH 5.7.1: Hadoop 2.6.5

Spark 3.0.1, 3.1.1, 3.1.2
            Reporter: Helt Long


When use spark3 submit a spark job to yarn cluster, I get a problem. Once in a 
while, driver can't distribute any tasks to any executors, and the stage will 
stuck , the total spark job will stuck. Check driver log, I found 
NullPointerException. It's like a netty problem, I can confirm this problem 
only exist in spark3, because I use spark2 never happend.

 
{code:java}
// Error message
21/06/28 14:42:43 INFO TaskSetManager: Starting task 2592.0 in stage 1.0 (TID 
3494) (worker39.hadoop, executor 84, partition 2592, RACK_LOCAL, 5006 bytes) 
taskResourceAssignments Map()
21/06/28 14:42:43 INFO TaskSetManager: Finished task 4155.0 in stage 1.0 (TID 
3367) in 36670 ms on worker39.hadoop (executor 84) (3278/4249)
21/06/28 14:42:43 INFO TaskSetManager: Finished task 2283.0 in stage 1.0 (TID 
3422) in 22371 ms on worker15.hadoop (executor 109) (3279/4249)
21/06/28 14:42:43 ERROR Inbox: Ignoring error
java.lang.NullPointerException
        at java.lang.String.length(String.java:623)
        at 
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at 
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
        at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
        at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
        at 
org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
        at 
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
        at scala.Option.map(Option.scala:230)
        at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
        at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
21/06/28 14:42:43 INFO TaskSetManager: Finished task 2255.0 in stage 1.0 (TID 
3419) in 23035 ms on worker15.hadoop (executor 116) (3280/4249)
21/06/28 14:42:43 ERROR Inbox: Ignoring error
java.lang.NullPointerException
        at java.lang.String.length(String.java:623)
        at 
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at 
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
        at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
        at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
        at 
org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
        at 
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
        at scala.Option.map(Option.scala:230)
        at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
        at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
.....

21/06/28 14:43:44 INFO TaskSetManager: Finished task 155.0 in stage 1.0 (TID 
3486) in 64503 ms on worker02.hadoop (executor 59) (3427/4249)
21/06/28 14:43:44 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:45 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:46 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:47 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:48 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:49 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:49 INFO TaskSetManager: Finished task 189.0 in stage 1.0 (TID 
3491) in 66320 ms on worker02.hadoop (executor 62) (3428/4249)
21/06/28 14:43:49 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:50 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:51 ERROR Inbox: Ignoring error
java.lang.NullPointerException{code}
 

 

The first time I found this problem in spark3.0.1, I checked spark source code, 
the NPE happended in  *org.apache.spark.scheduler.TaskSetManager.logInfo*, 
source code like below:
{code:java}
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, 
" +
    s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} 
bytes)")
{code}
I tried to confirm which variable is null, so I add log and repack spark-core 
like below:
{code:java}
// add log:
logInfo(s"zyh Starting $taskName")
logInfo(s"zyh (TID $taskId")
logInfo(s"zyh $host")
logInfo(s"zyh executor ${info.executorId}")
logInfo(s"zyh ${task.partitionId}")
logInfo(s"zyh $taskLocality,")
logInfo(s"zyh ${serializedTask.limit()} bytes)")

// repack command
./build/mvn -DskipTests -pl :spark-core_2.12 clean install{code}
Then I got *host* is null, try print:
{code:java}
println(host) // result is Some(null)
println(host.getClass.getName) // result is scala.Some
{code}
track host, add log everywhere to print it:

{{TaskSchedulerImpl.resourceOfferSingleTaskSet}} -> 
{{TaskSetManager.resourceOffer}}-> {{TaskSetManager.dequeueTask}} -> 
{{TaskSetManager.logInfo}}

Strange host is nomal ! But executorId become null... This two variable came 
from one case class  *WorkerOffer*

So I add log to print executorId everywhere, the problem disappered !!!
{code:java}
// TaskSchedulerImpl.resourceOfferSingleTaskSet
 private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      availableResources: Array[Map[String, Buffer[String]]],
      tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
      addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
    var launchedTask = false    // nodes and executors that are blacklisted for 
the entire application have already been
    // filtered out by this point
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      if (availableCpus(i) >= CPUS_PER_TASK &&
        resourcesMeetTaskRequirements(availableResources(i))) {
        try {
          val time = System.currentTimeMillis()
          // the first print
          // scalastyle:off println
          println(s"==zyh1== $time" + host.getClass.getName)
          println(s"==zyh1== $time" + execId.getClass.getName)
          for (task <- taskSet.resourceOffer(execId, host, maxLocality, 
availableResources(i))) {// TaskSetManager.resourceOffer
  def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality,
      availableResources: Map[String, Seq[String]] = Map.empty)
    : Option[TaskDescription] =
  {
    val time = System.currentTimeMillis()
    // the second print
    // scalastyle:off println
    println(s"==zyh2== $time" + host.getClass.getName)
    println(s"==zyh2== $time" + execId.getClass.getName)
      
    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
      blacklist.isNodeBlacklistedForTaskSet(host) ||
        blacklist.isExecutorBlacklistedForTaskSet(execId)
    }
    if (!isZombie && !offerBlacklisted) {
      val curTime = clock.getTimeMillis()      var allowedLocality = 
maxLocality      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }      dequeueTask(execId, host, allowedLocality).map { case ((index, 
taskLocality, speculative)) =>
          
// TaskSetManager.dequeueTask
private def dequeueTask(
      execId: String,
      host: String,
      maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
Boolean)] = {
    // Tries to schedule a regular task first; if it returns None, then 
schedules
    // a speculative task
    val time = System.currentTimeMillis()
    // the third print
    // scalastyle:off println
    println(s"==zyh3== $time" + host.getClass.getName)
    println(s"==zyh3== $time" + execId.getClass.getName)
    dequeueTaskHelper(execId, host, maxLocality, false).orElse(
      dequeueTaskHelper(execId, host, maxLocality, true))
  }
          
// TaskSetManager.resourceOffer.dequeueTask,上面返回后进入map时再次打印
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, 
speculative)) =>
        // Found a task; do some bookkeeping and return a task description
        val time = System.currentTimeMillis()
        // // the fourth print
        // scalastyle:off println
        println(s"==zyh4== $time" + host.getClass.getName)
        println(s"==zyh4== $time" + execId.getClass.getName)
    
// TaskSetManager.logInfo
    // try catch NPE
        val taskName = s"task ${info.id} in stage ${taskSet.id}"
        try {
          logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
            s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit()} bytes)")
        } catch {
          case e: NullPointerException =>
            val time = System.currentTimeMillis()
            e.printStackTrace()
            // scalastyle:off println
            println("+++zyh+++" + host.getClass.getName + "  " + 
taskName.getClass.getName)
            println("+++zyh+++" + taskId.getClass.getName + "  " + 
info.executorId.getClass.getName)
            println("+++zyh+++" + task.partitionId.getClass.getName)
            println("+++zyh+++" + taskLocality.getClass.getName)
            println("+++zyh+++" + serializedTask.limit().getClass.getName)
        }
{code}
This problem make it's difficult to trust a spark job is running or not, only 
when I check spark web ui and found it's not any running task, I can kill it by 
hand. I test on spark3.0.1 3.1.1 3.1.2, it's same problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to