This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5334494b00fc [SPARK-50648][CORE] Cleanup zombie tasks in non-running 
stages when the job is cancelled
5334494b00fc is described below

commit 5334494b00fc77cc26d9fe43677ce348c0ff979f
Author: chenliang.lu <[email protected]>
AuthorDate: Tue Dec 31 20:20:29 2024 +0800

    [SPARK-50648][CORE] Cleanup zombie tasks in non-running stages when the job 
is cancelled
    
    ### What changes were proposed in this pull request?
    
    This is a problem that Spark always had. See the following section for the 
scenario when the problem occurs.
    
     When cancel a job, some tasks may be still running.
    The reason is that when `DAGScheduler#handleTaskCompletion` encounters 
FetchFailed, `markStageAsFinished` will be called to remove the stage in 
`DAGScheduler#runningStages` (see 
https://github.com/apache/spark/blob/7cd5c4a1d1eb56fa92c10696bdbd8450d357b128/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2059)
 and don't `killAllTaskAttempts`.
    But `DAGScheduler#cancelRunningIndependentStages` only find 
`runningStages`, this will leave zombie shuffle tasks, occupying cluster 
resources.
    
    ### Why are the changes needed?
    
    Assume a job is stage1-> stage2, when FetchFailed occurs during the stage 
2, the stage1 and stage2  will resubmit (stage2 may still have some tasks 
running even if stage2 is resubmitted , this is as expected, because these 
tasks may eventually succeed and avoid retry)
    
    But during the execution of the stage1-retry , if the SQL is canceled, the 
tasks in stage1 and stage1-retry can all be killed, but the tasks previously 
running in stage2 are still running and can't be killed. These tasks can 
greatly affect cluster stability and occupy resources.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49270 from yabola/zombie-task-when-shuffle-retry.
    
    Authored-by: chenliang.lu <[email protected]>
    Signed-off-by: Yi Wu <[email protected]>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  3 +-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 52 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4f7338f74e29..e06b7d86e1db 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2937,7 +2937,8 @@ private[spark] class DAGScheduler(
         } else {
           // This stage is only used by the job, so finish the stage if it is 
running.
           val stage = stageIdToStage(stageId)
-          if (runningStages.contains(stage)) {
+          // Stages with failedAttemptIds may have tasks that are running
+          if (runningStages.contains(stage) || 
stage.failedAttemptIds.nonEmpty) {
             try { // killAllTaskAttempts will fail if a SchedulerBackend does 
not implement killTask
               taskScheduler.killAllTaskAttempts(stageId, 
shouldInterruptTaskThread(job), reason)
               if (legacyAbortStageAfterKillTasks) {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 243d33fe55a7..3e507df706ba 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -185,6 +185,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
   private var firstInit: Boolean = _
   /** Set of TaskSets the DAGScheduler has requested executed. */
   val taskSets = scala.collection.mutable.Buffer[TaskSet]()
+  /** Track running tasks, the key is the task's stageId , the value is the 
task's partitionId */
+  var runningTaskInfos = new HashMap[Int, HashSet[Int]]()
 
   /** Stages for which the DAGScheduler has called 
TaskScheduler.killAllTaskAttempts(). */
   val cancelledStages = new HashSet[Int]()
@@ -206,12 +208,14 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       // normally done by TaskSetManager
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
       taskSets += taskSet
+      runningTaskInfos.put(taskSet.stageId, new HashSet[Int]() ++ 
taskSet.tasks.map(_.partitionId))
     }
     override def killTaskAttempt(
       taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
     override def killAllTaskAttempts(
       stageId: Int, interruptThread: Boolean, reason: String): Unit = {
       cancelledStages += stageId
+      runningTaskInfos.remove(stageId)
     }
     override def notifyPartitionCompletion(stageId: Int, partitionId: Int): 
Unit = {
       taskSets.filter(_.stageId == stageId).lastOption.foreach { ts =>
@@ -393,6 +397,14 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
         handleShuffleMergeFinalized(shuffleMapStage, 
shuffleMapStage.shuffleDep.shuffleMergeId)
       }
     }
+
+    override private[scheduler] def handleTaskCompletion(event: 
CompletionEvent): Unit = {
+      super.handleTaskCompletion(event)
+      runningTaskInfos.get(event.task.stageId).foreach{ partitions =>
+        partitions -= event.task.partitionId
+        if (partitions.isEmpty) runningTaskInfos.remove(event.task.stageId)
+      }
+    }
   }
 
   override def beforeEach(): Unit = {
@@ -2252,6 +2264,46 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(scheduler.activeJobs.isEmpty)
   }
 
+  test("SPARK-50648: when job is cancelled during shuffle retry in parent 
stage, " +
+    "should kill all running tasks") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    sc.listenerBus.waitUntilEmpty()
+
+    val info = new TaskInfo(
+      3, index = 1, attemptNumber = 1,
+      partitionId = taskSets(1).tasks(0).partitionId, 0L, "", "", 
TaskLocality.ANY, true)
+    // result task 0.0 fetch failed, but result task 1.0 is still running
+    runEvent(makeCompletionEvent(taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, 1, 
"ignored"),
+      null,
+      Seq.empty,
+      Array.empty,
+      info))
+    sc.listenerBus.waitUntilEmpty()
+
+    Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
+    // map stage is running by resubmitted, result stage is waiting
+    // map tasks and the origin result task 1.0 are running
+    assert(scheduler.runningStages.size == 1, "Map stage should be running")
+    val mapStage = scheduler.runningStages.head
+    assert(mapStage.id === 0)
+    assert(mapStage.latestInfo.failureReason.isEmpty)
+    assert(scheduler.waitingStages.size == 1, "Result stage should be waiting")
+    assert(runningTaskInfos.size == 2)
+    assert(runningTaskInfos(taskSets(1).stageId).size == 1,
+      "origin result task 1.0 should be running")
+
+    scheduler.doCancelAllJobs()
+    // all tasks should be killed
+    assert(runningTaskInfos.isEmpty)
+    assert(scheduler.runningStages.isEmpty)
+    assert(scheduler.waitingStages.isEmpty)
+  }
+
   test("misbehaved accumulator should not crash DAGScheduler and 
SparkContext") {
     val acc = new LongAccumulator {
       override def add(v: java.lang.Long): Unit = throw new 
DAGSchedulerSuiteDummyException


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to