wuyi created SPARK-36558:
----------------------------
Summary: Stage has all tasks finished but with ongoing
finalization can cause job hang
Key: SPARK-36558
URL: https://issues.apache.org/jira/browse/SPARK-36558
Project: Spark
Issue Type: Sub-task
Components: Spark Core
Affects Versions: 3.2.0, 3.3.0
Reporter: wuyi
For a stage that all tasks are finished but with ongoing finalization can lead
to job hang. The problem is that such stage is considered as a "missing" stage
(see
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
And it breaks the original assumption that a "missing" stage must have tasks
to run.
Normally, if stage A is the parent of (result) stage B and all tasks have
finished in stage A, stage A will be skipped directly when submitting stage B.
However, with this bug, stage A will be submitted, which leads to the job hang
in the end.
The example to reproduce:
{code:java}
test("Job hang") {
initPushBasedShuffleConfs(conf)
conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
DAGSchedulerSuite.clearMergerLocs
DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
val latch = new CountDownLatch(1)
val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
// By this, we can mimic a stage with all tasks finished
// but finalization is incomplete.
latch.countDown()
}
}
sc.dagScheduler = myDAGScheduler
sc.taskScheduler.setDAGScheduler(myDAGScheduler)
val parts = 20
val shuffleMapRdd = new MyRDD(sc, parts, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
reduceRdd1.countAsync()
latch.await()
// set _shuffleMergedFinalized to true can avoid the hang.
// shuffleDep._shuffleMergedFinalized = true
val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
reduceRdd2.count()
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]