This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f92f9b84a [CELEBORN-1856][FOLLOWUP] Check `isCelebornSkewedShuffle ` 
before `registerCelebornSkewedShuffle` for stage rollback
f92f9b84a is described below

commit f92f9b84a0d058fd9a6f970d167cfeac226baac1
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Apr 11 11:45:44 2025 -0700

    [CELEBORN-1856][FOLLOWUP] Check `isCelebornSkewedShuffle ` before 
`registerCelebornSkewedShuffle` for stage rollback
    
    ### What changes were proposed in this pull request?
    Followup for https://github.com/apache/celeborn/pull/3118
    
    Add a condition check(isCelebornShuffleIndeterminate) before 
`registerCelebornSkewedShuffle` for stage rollback.
    ### Why are the changes needed?
    
    Fix the logical.
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Minor change.
    
    Closes #3209 from turboFei/spark_celeborn_patch.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../Celeborn-Optimize-Skew-Partitions-spark3_2.patch  | 19 +++++++++++--------
 .../Celeborn-Optimize-Skew-Partitions-spark3_3.patch  | 19 +++++++++++--------
 .../Celeborn-Optimize-Skew-Partitions-spark3_4.patch  | 19 +++++++++++--------
 .../Celeborn-Optimize-Skew-Partitions-spark3_5.patch  | 19 +++++++++++--------
 4 files changed, 44 insertions(+), 32 deletions(-)

diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
index 0e3be7a8e..509105cf1 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
 +
 +}
 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index b950c07f3d8..9e339db4fb4 100644
+index b950c07f3d8..6875582e0aa 100644
 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures, 
SettableFuture}
@@ -190,27 +190,30 @@ index b950c07f3d8..9e339db4fb4 100644
          if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
            logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
              s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
-@@ -1850,7 +1870,7 @@ private[spark] class DAGScheduler(
+@@ -1850,7 +1870,8 @@ private[spark] class DAGScheduler(
                // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
                // guaranteed to be determinate, so the input data of the 
reducers will not change
                // even if the map tasks are re-tried.
 -              if (mapStage.isIndeterminate) {
-+              if (mapStage.isIndeterminate || 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++              val isCelebornShuffleIndeterminate = 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++              if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) 
{
                  // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
                  // each stage only know its parents not children. Here we 
traverse the stages from
                  // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-@@ -1861,7 +1881,15 @@ private[spark] class DAGScheduler(
+@@ -1861,7 +1882,17 @@ private[spark] class DAGScheduler(
  
                  def collectStagesToRollback(stageChain: List[Stage]): Unit = {
                    if (stagesToRollback.contains(stageChain.head)) {
 -                    stageChain.drop(1).foreach(s => stagesToRollback += s)
 +                    stageChain.drop(1).foreach(s => {
 +                      stagesToRollback += s
-+                      s match {
-+                        case currentMapStage: ShuffleMapStage =>
-+                          
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+                        case _: ResultStage =>
++                      if (isCelebornShuffleIndeterminate) {
++                        s match {
++                          case currentMapStage: ShuffleMapStage =>
++                            
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++                          case _: ResultStage =>
 +                          // do nothing, should abort celeborn skewed read 
stage
++                        }
 +                      }
 +                    })
                    } else {
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
index 6bb8be966..44c3f8a97 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
 +
 +}
 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index bd2823bcac1..e97218b046b 100644
+index bd2823bcac1..996cf8662f2 100644
 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures, 
SettableFuture}
@@ -190,27 +190,30 @@ index bd2823bcac1..e97218b046b 100644
          if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
            logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
              s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
-@@ -1921,7 +1941,7 @@ private[spark] class DAGScheduler(
+@@ -1921,7 +1941,8 @@ private[spark] class DAGScheduler(
                // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
                // guaranteed to be determinate, so the input data of the 
reducers will not change
                // even if the map tasks are re-tried.
 -              if (mapStage.isIndeterminate) {
-+              if (mapStage.isIndeterminate || 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++              val isCelebornShuffleIndeterminate = 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++              if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) 
{
                  // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
                  // each stage only know its parents not children. Here we 
traverse the stages from
                  // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-@@ -1932,7 +1952,15 @@ private[spark] class DAGScheduler(
+@@ -1932,7 +1953,17 @@ private[spark] class DAGScheduler(
  
                  def collectStagesToRollback(stageChain: List[Stage]): Unit = {
                    if (stagesToRollback.contains(stageChain.head)) {
 -                    stageChain.drop(1).foreach(s => stagesToRollback += s)
 +                    stageChain.drop(1).foreach(s => {
 +                      stagesToRollback += s
-+                      s match {
-+                        case currentMapStage: ShuffleMapStage =>
-+                          
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+                        case _: ResultStage =>
++                      if (isCelebornShuffleIndeterminate) {
++                        s match {
++                          case currentMapStage: ShuffleMapStage =>
++                            
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++                          case _: ResultStage =>
 +                          // do nothing, should abort celeborn skewed read 
stage
++                        }
 +                      }
 +                    })
                    } else {
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
index 9f38d8026..27f7f4188 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
 +
 +}
 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 26be8c72bbc..4323b6d1a75 100644
+index 26be8c72bbc..b29692e8f12 100644
 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, 
SettableFuture}
@@ -190,27 +190,30 @@ index 26be8c72bbc..4323b6d1a75 100644
          if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
            logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
              s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
-@@ -1977,7 +1997,7 @@ private[spark] class DAGScheduler(
+@@ -1977,7 +1997,8 @@ private[spark] class DAGScheduler(
                // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
                // guaranteed to be determinate, so the input data of the 
reducers will not change
                // even if the map tasks are re-tried.
 -              if (mapStage.isIndeterminate) {
-+              if (mapStage.isIndeterminate || 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++              val isCelebornShuffleIndeterminate = 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++              if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) 
{
                  // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
                  // each stage only know its parents not children. Here we 
traverse the stages from
                  // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-@@ -1988,7 +2008,15 @@ private[spark] class DAGScheduler(
+@@ -1988,7 +2009,17 @@ private[spark] class DAGScheduler(
  
                  def collectStagesToRollback(stageChain: List[Stage]): Unit = {
                    if (stagesToRollback.contains(stageChain.head)) {
 -                    stageChain.drop(1).foreach(s => stagesToRollback += s)
 +                    stageChain.drop(1).foreach(s => {
 +                      stagesToRollback += s
-+                      s match {
-+                        case currentMapStage: ShuffleMapStage =>
-+                          
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+                        case _: ResultStage =>
++                      if (isCelebornShuffleIndeterminate) {
++                        s match {
++                          case currentMapStage: ShuffleMapStage =>
++                            
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++                          case _: ResultStage =>
 +                          // do nothing, should abort celeborn skewed read 
stage
++                        }
 +                      }
 +                    })
                    } else {
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
index 71d0f9859..d49a6c2c4 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
 +
 +}
 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 89d16e57934..36ce50093c0 100644
+index 89d16e57934..8b9ae779be2 100644
 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, 
SettableFuture}
@@ -202,27 +202,30 @@ index 89d16e57934..36ce50093c0 100644
          if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
            logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
              s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
-@@ -2042,7 +2065,7 @@ private[spark] class DAGScheduler(
+@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler(
                // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
                // guaranteed to be determinate, so the input data of the 
reducers will not change
                // even if the map tasks are re-tried.
 -              if (mapStage.isIndeterminate) {
-+              if (mapStage.isIndeterminate || 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++              val isCelebornShuffleIndeterminate = 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++              if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) 
{
                  // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
                  // each stage only know its parents not children. Here we 
traverse the stages from
                  // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-@@ -2053,7 +2076,15 @@ private[spark] class DAGScheduler(
+@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler(
  
                  def collectStagesToRollback(stageChain: List[Stage]): Unit = {
                    if (stagesToRollback.contains(stageChain.head)) {
 -                    stageChain.drop(1).foreach(s => stagesToRollback += s)
 +                    stageChain.drop(1).foreach(s => {
 +                      stagesToRollback += s
-+                      s match {
-+                        case currentMapStage: ShuffleMapStage =>
-+                          
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+                        case _: ResultStage =>
++                      if (isCelebornShuffleIndeterminate) {
++                        s match {
++                          case currentMapStage: ShuffleMapStage =>
++                            
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++                          case _: ResultStage =>
 +                          // do nothing, should abort celeborn skewed read 
stage
++                        }
 +                      }
 +                    })
                    } else {

Reply via email to