This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f92f9b84a [CELEBORN-1856][FOLLOWUP] Check `isCelebornSkewedShuffle `
before `registerCelebornSkewedShuffle` for stage rollback
f92f9b84a is described below
commit f92f9b84a0d058fd9a6f970d167cfeac226baac1
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Apr 11 11:45:44 2025 -0700
[CELEBORN-1856][FOLLOWUP] Check `isCelebornSkewedShuffle ` before
`registerCelebornSkewedShuffle` for stage rollback
### What changes were proposed in this pull request?
Followup for https://github.com/apache/celeborn/pull/3118
Add a condition check(isCelebornShuffleIndeterminate) before
`registerCelebornSkewedShuffle` for stage rollback.
### Why are the changes needed?
Fix the logical.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Minor change.
Closes #3209 from turboFei/spark_celeborn_patch.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../Celeborn-Optimize-Skew-Partitions-spark3_2.patch | 19 +++++++++++--------
.../Celeborn-Optimize-Skew-Partitions-spark3_3.patch | 19 +++++++++++--------
.../Celeborn-Optimize-Skew-Partitions-spark3_4.patch | 19 +++++++++++--------
.../Celeborn-Optimize-Skew-Partitions-spark3_5.patch | 19 +++++++++++--------
4 files changed, 44 insertions(+), 32 deletions(-)
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
index 0e3be7a8e..509105cf1 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index b950c07f3d8..9e339db4fb4 100644
+index b950c07f3d8..6875582e0aa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures,
SettableFuture}
@@ -190,27 +190,30 @@ index b950c07f3d8..9e339db4fb4 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for
that stage " +
-@@ -1850,7 +1870,7 @@ private[spark] class DAGScheduler(
+@@ -1850,7 +1870,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
// guaranteed to be determinate, so the input data of the
reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
-+ if (mapStage.isIndeterminate ||
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++ val isCelebornShuffleIndeterminate =
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate)
{
// It's a little tricky to find all the succeeding stages of
`mapStage`, because
// each stage only know its parents not children. Here we
traverse the stages from
// the leaf nodes (the result stages of active jobs), and
rollback all the stages
-@@ -1861,7 +1881,15 @@ private[spark] class DAGScheduler(
+@@ -1861,7 +1882,17 @@ private[spark] class DAGScheduler(
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
-+ s match {
-+ case currentMapStage: ShuffleMapStage =>
-+
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+ case _: ResultStage =>
++ if (isCelebornShuffleIndeterminate) {
++ s match {
++ case currentMapStage: ShuffleMapStage =>
++
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read
stage
++ }
+ }
+ })
} else {
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
index 6bb8be966..44c3f8a97 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index bd2823bcac1..e97218b046b 100644
+index bd2823bcac1..996cf8662f2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures,
SettableFuture}
@@ -190,27 +190,30 @@ index bd2823bcac1..e97218b046b 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for
that stage " +
-@@ -1921,7 +1941,7 @@ private[spark] class DAGScheduler(
+@@ -1921,7 +1941,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
// guaranteed to be determinate, so the input data of the
reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
-+ if (mapStage.isIndeterminate ||
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++ val isCelebornShuffleIndeterminate =
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate)
{
// It's a little tricky to find all the succeeding stages of
`mapStage`, because
// each stage only know its parents not children. Here we
traverse the stages from
// the leaf nodes (the result stages of active jobs), and
rollback all the stages
-@@ -1932,7 +1952,15 @@ private[spark] class DAGScheduler(
+@@ -1932,7 +1953,17 @@ private[spark] class DAGScheduler(
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
-+ s match {
-+ case currentMapStage: ShuffleMapStage =>
-+
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+ case _: ResultStage =>
++ if (isCelebornShuffleIndeterminate) {
++ s match {
++ case currentMapStage: ShuffleMapStage =>
++
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read
stage
++ }
+ }
+ })
} else {
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
index 9f38d8026..27f7f4188 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 26be8c72bbc..4323b6d1a75 100644
+index 26be8c72bbc..b29692e8f12 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures,
SettableFuture}
@@ -190,27 +190,30 @@ index 26be8c72bbc..4323b6d1a75 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for
that stage " +
-@@ -1977,7 +1997,7 @@ private[spark] class DAGScheduler(
+@@ -1977,7 +1997,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
// guaranteed to be determinate, so the input data of the
reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
-+ if (mapStage.isIndeterminate ||
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++ val isCelebornShuffleIndeterminate =
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate)
{
// It's a little tricky to find all the succeeding stages of
`mapStage`, because
// each stage only know its parents not children. Here we
traverse the stages from
// the leaf nodes (the result stages of active jobs), and
rollback all the stages
-@@ -1988,7 +2008,15 @@ private[spark] class DAGScheduler(
+@@ -1988,7 +2009,17 @@ private[spark] class DAGScheduler(
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
-+ s match {
-+ case currentMapStage: ShuffleMapStage =>
-+
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+ case _: ResultStage =>
++ if (isCelebornShuffleIndeterminate) {
++ s match {
++ case currentMapStage: ShuffleMapStage =>
++
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read
stage
++ }
+ }
+ })
} else {
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
index 71d0f9859..d49a6c2c4 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 89d16e57934..36ce50093c0 100644
+index 89d16e57934..8b9ae779be2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures,
SettableFuture}
@@ -202,27 +202,30 @@ index 89d16e57934..36ce50093c0 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for
that stage " +
-@@ -2042,7 +2065,7 @@ private[spark] class DAGScheduler(
+@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
// guaranteed to be determinate, so the input data of the
reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
-+ if (mapStage.isIndeterminate ||
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++ val isCelebornShuffleIndeterminate =
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate)
{
// It's a little tricky to find all the succeeding stages of
`mapStage`, because
// each stage only know its parents not children. Here we
traverse the stages from
// the leaf nodes (the result stages of active jobs), and
rollback all the stages
-@@ -2053,7 +2076,15 @@ private[spark] class DAGScheduler(
+@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler(
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
-+ s match {
-+ case currentMapStage: ShuffleMapStage =>
-+
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+ case _: ResultStage =>
++ if (isCelebornShuffleIndeterminate) {
++ s match {
++ case currentMapStage: ShuffleMapStage =>
++
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read
stage
++ }
+ }
+ })
} else {