Helt Long created SPARK-35914:
---------------------------------
Summary: Driver can't distribute task to executor because
NullPointerException
Key: SPARK-35914
URL: https://issues.apache.org/jira/browse/SPARK-35914
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 3.1.2, 3.1.1, 3.0.1
Environment: CDH 5.7.1: Hadoop 2.6.5
Spark 3.0.1, 3.1.1, 3.1.2
Reporter: Helt Long
When use spark3 submit a spark job to yarn cluster, I get a problem. Once in a
while, driver can't distribute any tasks to any executors, and the stage will
stuck , the total spark job will stuck. Check driver log, I found
NullPointerException. It's like a netty problem, I can confirm this problem
only exist in spark3, because I use spark2 never happend.
{code:java}
// Error message
21/06/28 14:42:43 INFO TaskSetManager: Starting task 2592.0 in stage 1.0 (TID
3494) (worker39.hadoop, executor 84, partition 2592, RACK_LOCAL, 5006 bytes)
taskResourceAssignments Map()
21/06/28 14:42:43 INFO TaskSetManager: Finished task 4155.0 in stage 1.0 (TID
3367) in 36670 ms on worker39.hadoop (executor 84) (3278/4249)
21/06/28 14:42:43 INFO TaskSetManager: Finished task 2283.0 in stage 1.0 (TID
3422) in 22371 ms on worker15.hadoop (executor 109) (3279/4249)
21/06/28 14:42:43 ERROR Inbox: Ignoring error
java.lang.NullPointerException
at java.lang.String.length(String.java:623)
at
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
at
org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
at
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
at scala.Option.map(Option.scala:230)
at
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
21/06/28 14:42:43 INFO TaskSetManager: Finished task 2255.0 in stage 1.0 (TID
3419) in 23035 ms on worker15.hadoop (executor 116) (3280/4249)
21/06/28 14:42:43 ERROR Inbox: Ignoring error
java.lang.NullPointerException
at java.lang.String.length(String.java:623)
at
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:483)
at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
at
org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
at
org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:484)
at scala.Option.map(Option.scala:230)
at
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:444)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:397)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:392)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:392)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:383)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:581)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:576)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:576)
at
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:547)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:547)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:340)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:904)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:332)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:157)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
.....
21/06/28 14:43:44 INFO TaskSetManager: Finished task 155.0 in stage 1.0 (TID
3486) in 64503 ms on worker02.hadoop (executor 59) (3427/4249)
21/06/28 14:43:44 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:45 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:46 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:47 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:48 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:49 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:49 INFO TaskSetManager: Finished task 189.0 in stage 1.0 (TID
3491) in 66320 ms on worker02.hadoop (executor 62) (3428/4249)
21/06/28 14:43:49 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:50 ERROR Inbox: Ignoring error
java.lang.NullPointerException
21/06/28 14:43:51 ERROR Inbox: Ignoring error
java.lang.NullPointerException{code}
The first time I found this problem in spark3.0.1, I checked spark source code,
the NPE happended in *org.apache.spark.scheduler.TaskSetManager.logInfo*,
source code like below:
{code:java}
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId},
" +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()}
bytes)")
{code}
I tried to confirm which variable is null, so I add log and repack spark-core
like below:
{code:java}
// add log:
logInfo(s"zyh Starting $taskName")
logInfo(s"zyh (TID $taskId")
logInfo(s"zyh $host")
logInfo(s"zyh executor ${info.executorId}")
logInfo(s"zyh ${task.partitionId}")
logInfo(s"zyh $taskLocality,")
logInfo(s"zyh ${serializedTask.limit()} bytes)")
// repack command
./build/mvn -DskipTests -pl :spark-core_2.12 clean install{code}
Then I got *host* is null, try print:
{code:java}
println(host) // result is Some(null)
println(host.getClass.getName) // result is scala.Some
{code}
track host, add log everywhere to print it:
{{TaskSchedulerImpl.resourceOfferSingleTaskSet}} ->
{{TaskSetManager.resourceOffer}}-> {{TaskSetManager.dequeueTask}} ->
{{TaskSetManager.logInfo}}
Strange host is nomal ! But executorId become null... This two variable came
from one case class *WorkerOffer*
So I add log to print executorId everywhere, the problem disappered !!!
{code:java}
// TaskSchedulerImpl.resourceOfferSingleTaskSet
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
var launchedTask = false // nodes and executors that are blacklisted for
the entire application have already been
// filtered out by this point
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK &&
resourcesMeetTaskRequirements(availableResources(i))) {
try {
val time = System.currentTimeMillis()
// the first print
// scalastyle:off println
println(s"==zyh1== $time" + host.getClass.getName)
println(s"==zyh1== $time" + execId.getClass.getName)
for (task <- taskSet.resourceOffer(execId, host, maxLocality,
availableResources(i))) {// TaskSetManager.resourceOffer
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality,
availableResources: Map[String, Seq[String]] = Map.empty)
: Option[TaskDescription] =
{
val time = System.currentTimeMillis()
// the second print
// scalastyle:off println
println(s"==zyh2== $time" + host.getClass.getName)
println(s"==zyh2== $time" + execId.getClass.getName)
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis() var allowedLocality =
maxLocality if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
} dequeueTask(execId, host, allowedLocality).map { case ((index,
taskLocality, speculative)) =>
// TaskSetManager.dequeueTask
private def dequeueTask(
execId: String,
host: String,
maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value,
Boolean)] = {
// Tries to schedule a regular task first; if it returns None, then
schedules
// a speculative task
val time = System.currentTimeMillis()
// the third print
// scalastyle:off println
println(s"==zyh3== $time" + host.getClass.getName)
println(s"==zyh3== $time" + execId.getClass.getName)
dequeueTaskHelper(execId, host, maxLocality, false).orElse(
dequeueTaskHelper(execId, host, maxLocality, true))
}
// TaskSetManager.resourceOffer.dequeueTask,上面返回后进入map时再次打印
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality,
speculative)) =>
// Found a task; do some bookkeeping and return a task description
val time = System.currentTimeMillis()
// // the fourth print
// scalastyle:off println
println(s"==zyh4== $time" + host.getClass.getName)
println(s"==zyh4== $time" + execId.getClass.getName)
// TaskSetManager.logInfo
// try catch NPE
val taskName = s"task ${info.id} in stage ${taskSet.id}"
try {
logInfo(s"Starting $taskName (TID $taskId, $host, executor
${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality,
${serializedTask.limit()} bytes)")
} catch {
case e: NullPointerException =>
val time = System.currentTimeMillis()
e.printStackTrace()
// scalastyle:off println
println("+++zyh+++" + host.getClass.getName + " " +
taskName.getClass.getName)
println("+++zyh+++" + taskId.getClass.getName + " " +
info.executorId.getClass.getName)
println("+++zyh+++" + task.partitionId.getClass.getName)
println("+++zyh+++" + taskLocality.getClass.getName)
println("+++zyh+++" + serializedTask.limit().getClass.getName)
}
{code}
This problem make it's difficult to trust a spark job is running or not, only
when I check spark web ui and found it's not any running task, I can kill it by
hand. I test on spark3.0.1 3.1.1 3.1.2, it's same problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]