This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 1dc229f31 [CELEBORN-1319][FOLLOWUP] Support celeborn optimize skew 
partitions patch for Spark v3.5.6 and v4.0.0
1dc229f31 is described below

commit 1dc229f31926b013fd1790a3a7f6dd26dd11d0c1
Author: SteNicholas <[email protected]>
AuthorDate: Thu Jun 12 11:04:17 2025 -0700

    [CELEBORN-1319][FOLLOWUP] Support celeborn optimize skew partitions patch 
for Spark v3.5.6 and v4.0.0
    
    ### What changes were proposed in this pull request?
    
    Support celeborn optimize skew partitions patch for Spark v3.5.6 and v4.0.0.
    
    ### Why are the changes needed?
    
    There is no patch of celeborn optimize skew partitions for Spark v4.0.0. 
Meanwhile, Spark v3.5.6 could not apply 
`Celeborn-Optimize-Skew-Partitions-spark3_5.patch` because of 
https://github.com/apache/spark/pull/50946.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    ```
    $ git checkout v3.5.6
    Previous HEAD position was fa33ea000a0 Preparing Spark release v4.0.0-rc7
    HEAD is now at 303c18c7466 Preparing Spark release v3.5.6-rc1
    $ git apply --check 
/celeborn/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
    $ git checkout v4.0.0
    Previous HEAD position was 303c18c7466 Preparing Spark release v3.5.6-rc1
    HEAD is now at fa33ea000a0 Preparing Spark release v4.0.0-rc7
    $ git apply --check 
/celeborn/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
    ```
    
    Closes #3329 from SteNicholas/CELEBORN-1319.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit cfc3f1b13a2584678a322ae8fa055fc263ee8f71)
    Signed-off-by: Wang, Fei <[email protected]>
---
 ...eleborn-Optimize-Skew-Partitions-spark3_2.patch |   2 +-
 ...eleborn-Optimize-Skew-Partitions-spark3_3.patch |   2 +-
 ...eleborn-Optimize-Skew-Partitions-spark3_4.patch |   2 +-
 ...eleborn-Optimize-Skew-Partitions-spark3_5.patch |   2 +-
 ...born-Optimize-Skew-Partitions-spark3_5_6.patch} |  84 ++++++++-----
 ...leborn-Optimize-Skew-Partitions-spark4_0.patch} | 140 ++++++++++++---------
 6 files changed, 136 insertions(+), 96 deletions(-)

diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
index f02902d64..257a8fe83 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
 +    ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
 +      .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
 +      .booleanConf
-+      .createWithDefault(false)
++      .createWithDefault(true)
 +
 +  private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
 +  private val stageRerunEnabled = new AtomicBoolean()
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
index 2b0aecf0b..ef0cdc318 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
 +    ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
 +      .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
 +      .booleanConf
-+      .createWithDefault(false)
++      .createWithDefault(true)
 +
 +  private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
 +  private val stageRerunEnabled = new AtomicBoolean()
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
index 03b26e07a..ddb33ad4f 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
 +    ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
 +      .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
 +      .booleanConf
-+      .createWithDefault(false)
++      .createWithDefault(true)
 +
 +  private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
 +  private val stageRerunEnabled = new AtomicBoolean()
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
index b9fc88b79..67f53889f 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
 +    ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
 +      .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
 +      .booleanConf
-+      .createWithDefault(false)
++      .createWithDefault(true)
 +
 +  private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
 +  private val stageRerunEnabled = new AtomicBoolean()
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
similarity index 85%
copy from assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
copy to assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
index b9fc88b79..362af6b08 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
 +    ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
 +      .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
 +      .booleanConf
-+      .createWithDefault(false)
++      .createWithDefault(true)
 +
 +  private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
 +  private val stageRerunEnabled = new AtomicBoolean()
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
 +
 +}
 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 89d16e57934..8b9ae779be2 100644
+index 5ae29a5cd02..b54265fa6e7 100644
 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, 
SettableFuture}
@@ -146,7 +146,7 @@ index 89d16e57934..8b9ae779be2 100644
  import org.apache.spark.errors.SparkCoreErrors
  import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
  import org.apache.spark.internal.Logging
-@@ -1480,7 +1481,10 @@ private[spark] class DAGScheduler(
+@@ -1486,7 +1487,10 @@ private[spark] class DAGScheduler(
      // The operation here can make sure for the partially completed 
intermediate stage,
      // `findMissingPartitions()` returns all partitions every time.
      stage match {
@@ -155,10 +155,19 @@ index 89d16e57934..8b9ae779be2 100644
 +        
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId)) && 
!sms.isAvailable =>
 +        logInfo(s"Unregistering shuffle output for stage ${stage.id}" +
 +          s" shuffle ${sms.shuffleDep.shuffleId}")
-         
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
-         sms.shuffleDep.newShuffleMergeState()
-       case _ =>
-@@ -1854,7 +1858,18 @@ private[spark] class DAGScheduler(
+         // already executed at least once
+         if (sms.getNextAttemptId > 0) {
+           // While we previously validated possible rollbacks during the 
handling of a FetchFailure,
+@@ -1495,7 +1499,7 @@ private[spark] class DAGScheduler(
+           // loss. Moreover, because this check occurs later in the process, 
if a result stage task
+           // has successfully completed, we can detect this and abort the 
job, as rolling back a
+           // result stage is not possible.
+-          val stagesToRollback = collectSucceedingStages(sms)
++          val stagesToRollback = collectSucceedingStages(sms, 
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId))
+           abortStageWithInvalidRollBack(stagesToRollback)
+           // stages which cannot be rolled back were aborted which leads to 
removing the
+           // the dependant job(s) from the active jobs set
+@@ -1880,7 +1884,18 @@ private[spark] class DAGScheduler(
          // tasks complete, they still count and we can mark the corresponding 
partitions as
          // finished if the stage is determinate. Here we notify the task 
scheduler to skip running
          // tasks for the same partition to save resource.
@@ -178,7 +187,7 @@ index 89d16e57934..8b9ae779be2 100644
            taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
          }
  
-@@ -1909,7 +1924,7 @@ private[spark] class DAGScheduler(
+@@ -1935,7 +1950,7 @@ private[spark] class DAGScheduler(
            case smt: ShuffleMapTask =>
              val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
              // Ignore task completion for old attempt of indeterminate stage
@@ -187,7 +196,7 @@ index 89d16e57934..8b9ae779be2 100644
                task.stageAttemptId < stage.latestInfo.attemptNumber()
              if (!ignoreIndeterminate) {
                shuffleStage.pendingPartitions -= task.partitionId
-@@ -1944,6 +1959,14 @@ private[spark] class DAGScheduler(
+@@ -1970,6 +1985,14 @@ private[spark] class DAGScheduler(
          val failedStage = stageIdToStage(task.stageId)
          val mapStage = shuffleIdToMapStage(shuffleId)
  
@@ -202,35 +211,46 @@ index 89d16e57934..8b9ae779be2 100644
          if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
            logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
              s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
-@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler(
+@@ -2068,8 +2091,9 @@ private[spark] class DAGScheduler(
                // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
                // guaranteed to be determinate, so the input data of the 
reducers will not change
                // even if the map tasks are re-tried.
 -              if (mapStage.isIndeterminate) {
+-                val stagesToRollback = collectSucceedingStages(mapStage)
 +              val isCelebornShuffleIndeterminate = 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
 +              if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) 
{
-                 // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
-                 // each stage only know its parents not children. Here we 
traverse the stages from
-                 // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler(
++                val stagesToRollback = collectSucceedingStages(mapStage, 
isCelebornShuffleIndeterminate)
+                 val rollingBackStages = 
abortStageWithInvalidRollBack(stagesToRollback)
+                 logInfo(s"The shuffle map stage $mapStage with indeterminate 
output was failed, " +
+                   s"we will roll back and rerun below stages which include 
itself and all its " +
+@@ -2233,7 +2257,7 @@ private[spark] class DAGScheduler(
+     }
+   }
+ 
+-  private def collectSucceedingStages(mapStage: ShuffleMapStage): 
HashSet[Stage] = {
++  private def collectSucceedingStages(mapStage: ShuffleMapStage, 
isCelebornShuffleIndeterminate: Boolean): HashSet[Stage] = {
+     // TODO: perhaps materialize this if we are going to compute it often 
enough ?
+     // It's a little tricky to find all the succeeding stages of `mapStage`, 
because
+     // each stage only know its parents not children. Here we traverse the 
stages from
+@@ -2245,7 +2269,17 @@ private[spark] class DAGScheduler(
  
-                 def collectStagesToRollback(stageChain: List[Stage]): Unit = {
-                   if (stagesToRollback.contains(stageChain.head)) {
--                    stageChain.drop(1).foreach(s => stagesToRollback += s)
-+                    stageChain.drop(1).foreach(s => {
-+                      stagesToRollback += s
-+                      if (isCelebornShuffleIndeterminate) {
-+                        s match {
-+                          case currentMapStage: ShuffleMapStage =>
-+                            
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+                          case _: ResultStage =>
-+                          // do nothing, should abort celeborn skewed read 
stage
-+                        }
-+                      }
-+                    })
-                   } else {
-                     stageChain.head.parents.foreach { s =>
-                       collectStagesToRollback(s :: stageChain)
+     def collectSucceedingStagesInternal(stageChain: List[Stage]): Unit = {
+       if (succeedingStages.contains(stageChain.head)) {
+-        stageChain.drop(1).foreach(s => succeedingStages += s)
++        stageChain.drop(1).foreach(s => {
++          succeedingStages += s
++          if (isCelebornShuffleIndeterminate) {
++            s match {
++              case currentMapStage: ShuffleMapStage =>
++                
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++              case _: ResultStage =>
++              // do nothing, should abort celeborn skewed read stage
++            }
++          }
++        })
+       } else {
+         stageChain.head.parents.foreach { s =>
+           collectSucceedingStagesInternal(s :: stageChain)
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
 new file mode 100644
 index 00000000000..3dc60678461
@@ -334,7 +354,7 @@ index 37cdea084d8..4694a06919e 100644
            logDebug(s"Right side partition $partitionIndex " +
              s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " +
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
-index 9370b3d8d1d..d36e26a1376 100644
+index 9370b3d8d1d..599eb976591 100644
 --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
similarity index 77%
copy from assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
copy to assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
index b9fc88b79..1bd62f687 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
-index 9a7a3b0c0e7..543423dadd9 100644
+index a660bccd2e6..a231ecfe72f 100644
 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 @@ -34,6 +34,7 @@ import org.apache.commons.io.output.{ByteArrayOutputStream 
=> ApacheByteArrayOut
@@ -22,30 +22,30 @@ index 9a7a3b0c0e7..543423dadd9 100644
  
  import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
 +import org.apache.spark.celeborn.CelebornShuffleState
- import org.apache.spark.internal.Logging
+ import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+ import org.apache.spark.internal.LogKeys._
  import org.apache.spark.internal.config._
- import org.apache.spark.io.CompressionCodec
-@@ -916,6 +917,7 @@ private[spark] class MapOutputTrackerMaster(
-         shuffleStatus.invalidateSerializedMergeOutputStatusCache()
-       }
+@@ -929,6 +930,7 @@ private[spark] class MapOutputTrackerMaster(
+       shuffleStatus.invalidateSerializedMapOutputStatusCache()
+       shuffleStatus.invalidateSerializedMergeOutputStatusCache()
      }
 +    CelebornShuffleState.unregisterCelebornSkewedShuffle(shuffleId)
    }
  
    /**
 diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
-index edad91a0c6f..76b377729a0 100644
+index bf6e30f5afa..15dd28475a1 100644
 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
 +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
-@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
+@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration
  import org.apache.spark.annotation.DeveloperApi
- import org.apache.spark.api.python.PythonWorkerFactory
+ import org.apache.spark.api.python.{PythonWorker, PythonWorkerFactory}
  import org.apache.spark.broadcast.BroadcastManager
 +import org.apache.spark.celeborn.CelebornShuffleState
  import org.apache.spark.executor.ExecutorBackend
- import org.apache.spark.internal.{config, Logging}
- import org.apache.spark.internal.config._
-@@ -419,6 +420,7 @@ object SparkEnv extends Logging {
+ import org.apache.spark.internal.{config, Logging, MDC}
+ import org.apache.spark.internal.LogKeys
+@@ -494,6 +495,7 @@ object SparkEnv extends Logging {
      if (isDriver) {
        val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), 
"userFiles").getAbsolutePath
        envInstance.driverTmpDir = Some(sparkFilesDir)
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
 +    ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
 +      .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
 +      .booleanConf
-+      .createWithDefault(false)
++      .createWithDefault(true)
 +
 +  private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
 +  private val stageRerunEnabled = new AtomicBoolean()
@@ -135,30 +135,39 @@ index 00000000000..5e190c512df
 +
 +}
 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 89d16e57934..8b9ae779be2 100644
+index baf0ed4df53..2d7e36344a0 100644
 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, 
SettableFuture}
+@@ -35,6 +35,7 @@ import com.google.common.util.concurrent.{Futures, 
SettableFuture}
  
  import org.apache.spark._
  import org.apache.spark.broadcast.Broadcast
 +import org.apache.spark.celeborn.CelebornShuffleState
  import org.apache.spark.errors.SparkCoreErrors
  import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
- import org.apache.spark.internal.Logging
-@@ -1480,7 +1481,10 @@ private[spark] class DAGScheduler(
+ import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
+@@ -1551,7 +1552,10 @@ private[spark] class DAGScheduler(
      // The operation here can make sure for the partially completed 
intermediate stage,
      // `findMissingPartitions()` returns all partitions every time.
      stage match {
 -      case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable 
=>
 +      case sms: ShuffleMapStage if (stage.isIndeterminate ||
 +        
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId)) && 
!sms.isAvailable =>
-+        logInfo(s"Unregistering shuffle output for stage ${stage.id}" +
-+          s" shuffle ${sms.shuffleDep.shuffleId}")
-         
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
-         sms.shuffleDep.newShuffleMergeState()
-       case _ =>
-@@ -1854,7 +1858,18 @@ private[spark] class DAGScheduler(
++        logInfo(log"Unregistering shuffle output for stage ${MDC(STAGE_ID, 
stage.id)}" +
++          log" shuffle ${MDC(SHUFFLE_ID, sms.shuffleDep.shuffleId)}")
+         // already executed at least once
+         if (sms.getNextAttemptId > 0) {
+           // While we previously validated possible rollbacks during the 
handling of a FetchFailure,
+@@ -1560,7 +1564,7 @@ private[spark] class DAGScheduler(
+           // loss. Moreover, because this check occurs later in the process, 
if a result stage task
+           // has successfully completed, we can detect this and abort the 
job, as rolling back a
+           // result stage is not possible.
+-          val stagesToRollback = collectSucceedingStages(sms)
++          val stagesToRollback = collectSucceedingStages(sms, 
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId))
+           abortStageWithInvalidRollBack(stagesToRollback)
+           // stages which cannot be rolled back were aborted which leads to 
removing the
+           // the dependant job(s) from the active jobs set
+@@ -1951,7 +1955,18 @@ private[spark] class DAGScheduler(
          // tasks complete, they still count and we can mark the corresponding 
partitions as
          // finished if the stage is determinate. Here we notify the task 
scheduler to skip running
          // tasks for the same partition to save resource.
@@ -178,7 +187,7 @@ index 89d16e57934..8b9ae779be2 100644
            taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
          }
  
-@@ -1909,7 +1924,7 @@ private[spark] class DAGScheduler(
+@@ -2007,7 +2022,7 @@ private[spark] class DAGScheduler(
            case smt: ShuffleMapTask =>
              val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
              // Ignore task completion for old attempt of indeterminate stage
@@ -187,7 +196,7 @@ index 89d16e57934..8b9ae779be2 100644
                task.stageAttemptId < stage.latestInfo.attemptNumber()
              if (!ignoreIndeterminate) {
                shuffleStage.pendingPartitions -= task.partitionId
-@@ -1944,6 +1959,14 @@ private[spark] class DAGScheduler(
+@@ -2043,6 +2058,14 @@ private[spark] class DAGScheduler(
          val failedStage = stageIdToStage(task.stageId)
          val mapStage = shuffleIdToMapStage(shuffleId)
  
@@ -199,38 +208,49 @@ index 89d16e57934..8b9ae779be2 100644
 +          abortStage(failedStage, shuffleFailedReason, None)
 +        }
 +
-         if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
-           logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
-             s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
-@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler(
+         if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) {
+           logInfo(log"Ignoring fetch failure from " +
+             log"${MDC(TASK_ID, task)} as it's from " +
+@@ -2148,8 +2171,9 @@ private[spark] class DAGScheduler(
                // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
                // guaranteed to be determinate, so the input data of the 
reducers will not change
                // even if the map tasks are re-tried.
 -              if (mapStage.isIndeterminate) {
+-                val stagesToRollback = collectSucceedingStages(mapStage)
 +              val isCelebornShuffleIndeterminate = 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
 +              if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) 
{
-                 // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
-                 // each stage only know its parents not children. Here we 
traverse the stages from
-                 // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler(
++                val stagesToRollback = collectSucceedingStages(mapStage, 
isCelebornShuffleIndeterminate)
+                 val rollingBackStages = 
abortStageWithInvalidRollBack(stagesToRollback)
+                 logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, 
mapStage)} with indeterminate output was failed, " +
+                   log"we will roll back and rerun below stages which include 
itself and all its " +
+@@ -2314,7 +2338,7 @@ private[spark] class DAGScheduler(
+     }
+   }
+ 
+-  private def collectSucceedingStages(mapStage: ShuffleMapStage): 
HashSet[Stage] = {
++  private def collectSucceedingStages(mapStage: ShuffleMapStage, 
isCelebornShuffleIndeterminate: Boolean): HashSet[Stage] = {
+     // TODO: perhaps materialize this if we are going to compute it often 
enough ?
+     // It's a little tricky to find all the succeeding stages of `mapStage`, 
because
+     // each stage only know its parents not children. Here we traverse the 
stages from
+@@ -2326,7 +2350,17 @@ private[spark] class DAGScheduler(
  
-                 def collectStagesToRollback(stageChain: List[Stage]): Unit = {
-                   if (stagesToRollback.contains(stageChain.head)) {
--                    stageChain.drop(1).foreach(s => stagesToRollback += s)
-+                    stageChain.drop(1).foreach(s => {
-+                      stagesToRollback += s
-+                      if (isCelebornShuffleIndeterminate) {
-+                        s match {
-+                          case currentMapStage: ShuffleMapStage =>
-+                            
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+                          case _: ResultStage =>
-+                          // do nothing, should abort celeborn skewed read 
stage
-+                        }
-+                      }
-+                    })
-                   } else {
-                     stageChain.head.parents.foreach { s =>
-                       collectStagesToRollback(s :: stageChain)
+     def collectSucceedingStagesInternal(stageChain: List[Stage]): Unit = {
+       if (succeedingStages.contains(stageChain.head)) {
+-        stageChain.drop(1).foreach(s => succeedingStages += s)
++        stageChain.drop(1).foreach(s => {
++          succeedingStages += s
++          if (isCelebornShuffleIndeterminate) {
++            s match {
++              case currentMapStage: ShuffleMapStage =>
++                
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++              case _: ResultStage =>
++              // do nothing, should abort celeborn skewed read stage
++            }
++          }
++        })
+       } else {
+         stageChain.head.parents.foreach { s =>
+           collectSucceedingStagesInternal(s :: stageChain)
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
 new file mode 100644
 index 00000000000..3dc60678461
@@ -306,10 +326,10 @@ index abd096b9c7c..ff0363f87d8 100644
      if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) {
        shuffle
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
-index 37cdea084d8..4694a06919e 100644
+index c256b3fcb6b..c366ff346e2 100644
 --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
-@@ -152,8 +152,10 @@ case class OptimizeSkewedJoin(ensureRequirements: 
EnsureRequirements)
+@@ -151,8 +151,10 @@ case class OptimizeSkewedJoin(ensureRequirements: 
EnsureRequirements)
          Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1, 
rightSize))
  
        val leftParts = if (isLeftSkew) {
@@ -320,8 +340,8 @@ index 37cdea084d8..4694a06919e 100644
 +          isCelebornShuffle = isCelebornShuffle)
          if (skewSpecs.isDefined) {
            logDebug(s"Left side partition $partitionIndex " +
-             s"(${FileUtils.byteCountToDisplaySize(leftSize)}) is skewed, " +
-@@ -166,8 +168,10 @@ case class OptimizeSkewedJoin(ensureRequirements: 
EnsureRequirements)
+             s"(${Utils.bytesToString(leftSize)}) is skewed, " +
+@@ -165,8 +167,10 @@ case class OptimizeSkewedJoin(ensureRequirements: 
EnsureRequirements)
        }
  
        val rightParts = if (isRightSkew) {
@@ -332,9 +352,9 @@ index 37cdea084d8..4694a06919e 100644
 +          isCelebornShuffle = isCelebornShuffle)
          if (skewSpecs.isDefined) {
            logDebug(s"Right side partition $partitionIndex " +
-             s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " +
+             s"(${Utils.bytesToString(rightSize)}) is skewed, " +
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
-index 9370b3d8d1d..d36e26a1376 100644
+index 1ea4df02546..22bc98f4f86 100644
 --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive
@@ -342,10 +362,10 @@ index 9370b3d8d1d..d36e26a1376 100644
  
  import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, 
SparkEnv}
 +import org.apache.spark.celeborn.CelebornShuffleState
- import org.apache.spark.internal.Logging
+ import org.apache.spark.internal.{Logging, LogKeys, MDC}
  import org.apache.spark.sql.execution.{CoalescedPartitionSpec, 
PartialReducerPartitionSpec, ShufflePartitionSpec}
  
-@@ -382,13 +383,21 @@ object ShufflePartitionsUtil extends Logging {
+@@ -384,13 +385,21 @@ object ShufflePartitionsUtil extends Logging {
        shuffleId: Int,
        reducerId: Int,
        targetSize: Long,
@@ -363,13 +383,13 @@ index 9370b3d8d1d..d36e26a1376 100644
 +
 +      val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
 +      if (stageRerunEnabled && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
-+        logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is 
skewed")
++        logInfo(log"Celeborn shuffle retry enabled and shuffle 
${MDC(LogKeys.SHUFFLE_ID, shuffleId)} is skewed")
 +        CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
 +      }
        Some(mapStartIndices.indices.map { i =>
          val startMapIndex = mapStartIndices(i)
          val endMapIndex = if (i == mapStartIndices.length - 1) {
-@@ -402,7 +411,15 @@ object ShufflePartitionsUtil extends Logging {
+@@ -404,7 +413,15 @@ object ShufflePartitionsUtil extends Logging {
            dataSize += mapPartitionSizes(mapIndex)
            mapIndex += 1
          }

Reply via email to