This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cfc3f1b13 [CELEBORN-1319][FOLLOWUP] Support celeborn optimize skew
partitions patch for Spark v3.5.6 and v4.0.0
cfc3f1b13 is described below
commit cfc3f1b13a2584678a322ae8fa055fc263ee8f71
Author: SteNicholas <[email protected]>
AuthorDate: Thu Jun 12 11:04:17 2025 -0700
[CELEBORN-1319][FOLLOWUP] Support celeborn optimize skew partitions patch
for Spark v3.5.6 and v4.0.0
### What changes were proposed in this pull request?
Support celeborn optimize skew partitions patch for Spark v3.5.6 and v4.0.0.
### Why are the changes needed?
There is no patch of celeborn optimize skew partitions for Spark v4.0.0.
Meanwhile, Spark v3.5.6 could not apply
`Celeborn-Optimize-Skew-Partitions-spark3_5.patch` because of
https://github.com/apache/spark/pull/50946.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
```
$ git checkout v3.5.6
Previous HEAD position was fa33ea000a0 Preparing Spark release v4.0.0-rc7
HEAD is now at 303c18c7466 Preparing Spark release v3.5.6-rc1
$ git apply --check
/celeborn/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
$ git checkout v4.0.0
Previous HEAD position was 303c18c7466 Preparing Spark release v3.5.6-rc1
HEAD is now at fa33ea000a0 Preparing Spark release v4.0.0-rc7
$ git apply --check
/celeborn/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
```
Closes #3329 from SteNicholas/CELEBORN-1319.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
...eleborn-Optimize-Skew-Partitions-spark3_2.patch | 2 +-
...eleborn-Optimize-Skew-Partitions-spark3_3.patch | 2 +-
...eleborn-Optimize-Skew-Partitions-spark3_4.patch | 2 +-
...eleborn-Optimize-Skew-Partitions-spark3_5.patch | 2 +-
...born-Optimize-Skew-Partitions-spark3_5_6.patch} | 84 ++++++++-----
...leborn-Optimize-Skew-Partitions-spark4_0.patch} | 140 ++++++++++++---------
6 files changed, 136 insertions(+), 96 deletions(-)
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
index f02902d64..257a8fe83 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
+ ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
+ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
+ .booleanConf
-+ .createWithDefault(false)
++ .createWithDefault(true)
+
+ private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
+ private val stageRerunEnabled = new AtomicBoolean()
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
index 2b0aecf0b..ef0cdc318 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
+ ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
+ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
+ .booleanConf
-+ .createWithDefault(false)
++ .createWithDefault(true)
+
+ private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
+ private val stageRerunEnabled = new AtomicBoolean()
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
index 03b26e07a..ddb33ad4f 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
+ ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
+ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
+ .booleanConf
-+ .createWithDefault(false)
++ .createWithDefault(true)
+
+ private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
+ private val stageRerunEnabled = new AtomicBoolean()
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
index b9fc88b79..67f53889f 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
+ ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
+ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
+ .booleanConf
-+ .createWithDefault(false)
++ .createWithDefault(true)
+
+ private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
+ private val stageRerunEnabled = new AtomicBoolean()
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
similarity index 85%
copy from assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
copy to assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
index b9fc88b79..362af6b08 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
+ ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
+ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
+ .booleanConf
-+ .createWithDefault(false)
++ .createWithDefault(true)
+
+ private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
+ private val stageRerunEnabled = new AtomicBoolean()
@@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 89d16e57934..8b9ae779be2 100644
+index 5ae29a5cd02..b54265fa6e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures,
SettableFuture}
@@ -146,7 +146,7 @@ index 89d16e57934..8b9ae779be2 100644
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
-@@ -1480,7 +1481,10 @@ private[spark] class DAGScheduler(
+@@ -1486,7 +1487,10 @@ private[spark] class DAGScheduler(
// The operation here can make sure for the partially completed
intermediate stage,
// `findMissingPartitions()` returns all partitions every time.
stage match {
@@ -155,10 +155,19 @@ index 89d16e57934..8b9ae779be2 100644
+
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId)) &&
!sms.isAvailable =>
+ logInfo(s"Unregistering shuffle output for stage ${stage.id}" +
+ s" shuffle ${sms.shuffleDep.shuffleId}")
-
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
- sms.shuffleDep.newShuffleMergeState()
- case _ =>
-@@ -1854,7 +1858,18 @@ private[spark] class DAGScheduler(
+ // already executed at least once
+ if (sms.getNextAttemptId > 0) {
+ // While we previously validated possible rollbacks during the
handling of a FetchFailure,
+@@ -1495,7 +1499,7 @@ private[spark] class DAGScheduler(
+ // loss. Moreover, because this check occurs later in the process,
if a result stage task
+ // has successfully completed, we can detect this and abort the
job, as rolling back a
+ // result stage is not possible.
+- val stagesToRollback = collectSucceedingStages(sms)
++ val stagesToRollback = collectSucceedingStages(sms,
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId))
+ abortStageWithInvalidRollBack(stagesToRollback)
+ // stages which cannot be rolled back were aborted which leads to
removing the
+ // the dependant job(s) from the active jobs set
+@@ -1880,7 +1884,18 @@ private[spark] class DAGScheduler(
// tasks complete, they still count and we can mark the corresponding
partitions as
// finished if the stage is determinate. Here we notify the task
scheduler to skip running
// tasks for the same partition to save resource.
@@ -178,7 +187,7 @@ index 89d16e57934..8b9ae779be2 100644
taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
}
-@@ -1909,7 +1924,7 @@ private[spark] class DAGScheduler(
+@@ -1935,7 +1950,7 @@ private[spark] class DAGScheduler(
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
// Ignore task completion for old attempt of indeterminate stage
@@ -187,7 +196,7 @@ index 89d16e57934..8b9ae779be2 100644
task.stageAttemptId < stage.latestInfo.attemptNumber()
if (!ignoreIndeterminate) {
shuffleStage.pendingPartitions -= task.partitionId
-@@ -1944,6 +1959,14 @@ private[spark] class DAGScheduler(
+@@ -1970,6 +1985,14 @@ private[spark] class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleIdToMapStage(shuffleId)
@@ -202,35 +211,46 @@ index 89d16e57934..8b9ae779be2 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for
that stage " +
-@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler(
+@@ -2068,8 +2091,9 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
// guaranteed to be determinate, so the input data of the
reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
+- val stagesToRollback = collectSucceedingStages(mapStage)
+ val isCelebornShuffleIndeterminate =
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate)
{
- // It's a little tricky to find all the succeeding stages of
`mapStage`, because
- // each stage only know its parents not children. Here we
traverse the stages from
- // the leaf nodes (the result stages of active jobs), and
rollback all the stages
-@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler(
++ val stagesToRollback = collectSucceedingStages(mapStage,
isCelebornShuffleIndeterminate)
+ val rollingBackStages =
abortStageWithInvalidRollBack(stagesToRollback)
+ logInfo(s"The shuffle map stage $mapStage with indeterminate
output was failed, " +
+ s"we will roll back and rerun below stages which include
itself and all its " +
+@@ -2233,7 +2257,7 @@ private[spark] class DAGScheduler(
+ }
+ }
+
+- private def collectSucceedingStages(mapStage: ShuffleMapStage):
HashSet[Stage] = {
++ private def collectSucceedingStages(mapStage: ShuffleMapStage,
isCelebornShuffleIndeterminate: Boolean): HashSet[Stage] = {
+ // TODO: perhaps materialize this if we are going to compute it often
enough ?
+ // It's a little tricky to find all the succeeding stages of `mapStage`,
because
+ // each stage only know its parents not children. Here we traverse the
stages from
+@@ -2245,7 +2269,17 @@ private[spark] class DAGScheduler(
- def collectStagesToRollback(stageChain: List[Stage]): Unit = {
- if (stagesToRollback.contains(stageChain.head)) {
-- stageChain.drop(1).foreach(s => stagesToRollback += s)
-+ stageChain.drop(1).foreach(s => {
-+ stagesToRollback += s
-+ if (isCelebornShuffleIndeterminate) {
-+ s match {
-+ case currentMapStage: ShuffleMapStage =>
-+
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+ case _: ResultStage =>
-+ // do nothing, should abort celeborn skewed read
stage
-+ }
-+ }
-+ })
- } else {
- stageChain.head.parents.foreach { s =>
- collectStagesToRollback(s :: stageChain)
+ def collectSucceedingStagesInternal(stageChain: List[Stage]): Unit = {
+ if (succeedingStages.contains(stageChain.head)) {
+- stageChain.drop(1).foreach(s => succeedingStages += s)
++ stageChain.drop(1).foreach(s => {
++ succeedingStages += s
++ if (isCelebornShuffleIndeterminate) {
++ s match {
++ case currentMapStage: ShuffleMapStage =>
++
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++ case _: ResultStage =>
++ // do nothing, should abort celeborn skewed read stage
++ }
++ }
++ })
+ } else {
+ stageChain.head.parents.foreach { s =>
+ collectSucceedingStagesInternal(s :: stageChain)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
new file mode 100644
index 00000000000..3dc60678461
@@ -334,7 +354,7 @@ index 37cdea084d8..4694a06919e 100644
logDebug(s"Right side partition $partitionIndex " +
s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
-index 9370b3d8d1d..d36e26a1376 100644
+index 9370b3d8d1d..599eb976591 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
similarity index 77%
copy from assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
copy to assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
index b9fc88b79..1bd62f687 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
@@ -14,7 +14,7 @@
# limitations under the License.
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
-index 9a7a3b0c0e7..543423dadd9 100644
+index a660bccd2e6..a231ecfe72f 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -34,6 +34,7 @@ import org.apache.commons.io.output.{ByteArrayOutputStream
=> ApacheByteArrayOut
@@ -22,30 +22,30 @@ index 9a7a3b0c0e7..543423dadd9 100644
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
+import org.apache.spark.celeborn.CelebornShuffleState
- import org.apache.spark.internal.Logging
+ import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+ import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config._
- import org.apache.spark.io.CompressionCodec
-@@ -916,6 +917,7 @@ private[spark] class MapOutputTrackerMaster(
- shuffleStatus.invalidateSerializedMergeOutputStatusCache()
- }
+@@ -929,6 +930,7 @@ private[spark] class MapOutputTrackerMaster(
+ shuffleStatus.invalidateSerializedMapOutputStatusCache()
+ shuffleStatus.invalidateSerializedMergeOutputStatusCache()
}
+ CelebornShuffleState.unregisterCelebornSkewedShuffle(shuffleId)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
-index edad91a0c6f..76b377729a0 100644
+index bf6e30f5afa..15dd28475a1 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
-@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
+@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.DeveloperApi
- import org.apache.spark.api.python.PythonWorkerFactory
+ import org.apache.spark.api.python.{PythonWorker, PythonWorkerFactory}
import org.apache.spark.broadcast.BroadcastManager
+import org.apache.spark.celeborn.CelebornShuffleState
import org.apache.spark.executor.ExecutorBackend
- import org.apache.spark.internal.{config, Logging}
- import org.apache.spark.internal.config._
-@@ -419,6 +420,7 @@ object SparkEnv extends Logging {
+ import org.apache.spark.internal.{config, Logging, MDC}
+ import org.apache.spark.internal.LogKeys
+@@ -494,6 +495,7 @@ object SparkEnv extends Logging {
if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf),
"userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
@@ -95,7 +95,7 @@ index 00000000000..5e190c512df
+ ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled")
+ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure")
+ .booleanConf
-+ .createWithDefault(false)
++ .createWithDefault(true)
+
+ private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean()
+ private val stageRerunEnabled = new AtomicBoolean()
@@ -135,30 +135,39 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-index 89d16e57934..8b9ae779be2 100644
+index baf0ed4df53..2d7e36344a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures,
SettableFuture}
+@@ -35,6 +35,7 @@ import com.google.common.util.concurrent.{Futures,
SettableFuture}
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.celeborn.CelebornShuffleState
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
- import org.apache.spark.internal.Logging
-@@ -1480,7 +1481,10 @@ private[spark] class DAGScheduler(
+ import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
+@@ -1551,7 +1552,10 @@ private[spark] class DAGScheduler(
// The operation here can make sure for the partially completed
intermediate stage,
// `findMissingPartitions()` returns all partitions every time.
stage match {
- case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable
=>
+ case sms: ShuffleMapStage if (stage.isIndeterminate ||
+
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId)) &&
!sms.isAvailable =>
-+ logInfo(s"Unregistering shuffle output for stage ${stage.id}" +
-+ s" shuffle ${sms.shuffleDep.shuffleId}")
-
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
- sms.shuffleDep.newShuffleMergeState()
- case _ =>
-@@ -1854,7 +1858,18 @@ private[spark] class DAGScheduler(
++ logInfo(log"Unregistering shuffle output for stage ${MDC(STAGE_ID,
stage.id)}" +
++ log" shuffle ${MDC(SHUFFLE_ID, sms.shuffleDep.shuffleId)}")
+ // already executed at least once
+ if (sms.getNextAttemptId > 0) {
+ // While we previously validated possible rollbacks during the
handling of a FetchFailure,
+@@ -1560,7 +1564,7 @@ private[spark] class DAGScheduler(
+ // loss. Moreover, because this check occurs later in the process,
if a result stage task
+ // has successfully completed, we can detect this and abort the
job, as rolling back a
+ // result stage is not possible.
+- val stagesToRollback = collectSucceedingStages(sms)
++ val stagesToRollback = collectSucceedingStages(sms,
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId))
+ abortStageWithInvalidRollBack(stagesToRollback)
+ // stages which cannot be rolled back were aborted which leads to
removing the
+ // the dependant job(s) from the active jobs set
+@@ -1951,7 +1955,18 @@ private[spark] class DAGScheduler(
// tasks complete, they still count and we can mark the corresponding
partitions as
// finished if the stage is determinate. Here we notify the task
scheduler to skip running
// tasks for the same partition to save resource.
@@ -178,7 +187,7 @@ index 89d16e57934..8b9ae779be2 100644
taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
}
-@@ -1909,7 +1924,7 @@ private[spark] class DAGScheduler(
+@@ -2007,7 +2022,7 @@ private[spark] class DAGScheduler(
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
// Ignore task completion for old attempt of indeterminate stage
@@ -187,7 +196,7 @@ index 89d16e57934..8b9ae779be2 100644
task.stageAttemptId < stage.latestInfo.attemptNumber()
if (!ignoreIndeterminate) {
shuffleStage.pendingPartitions -= task.partitionId
-@@ -1944,6 +1959,14 @@ private[spark] class DAGScheduler(
+@@ -2043,6 +2058,14 @@ private[spark] class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleIdToMapStage(shuffleId)
@@ -199,38 +208,49 @@ index 89d16e57934..8b9ae779be2 100644
+ abortStage(failedStage, shuffleFailedReason, None)
+ }
+
- if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
- logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
- s" ${task.stageAttemptId} and there is a more recent attempt for
that stage " +
-@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler(
+ if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) {
+ logInfo(log"Ignoring fetch failure from " +
+ log"${MDC(TASK_ID, task)} as it's from " +
+@@ -2148,8 +2171,9 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
// guaranteed to be determinate, so the input data of the
reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
+- val stagesToRollback = collectSucceedingStages(mapStage)
+ val isCelebornShuffleIndeterminate =
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate)
{
- // It's a little tricky to find all the succeeding stages of
`mapStage`, because
- // each stage only know its parents not children. Here we
traverse the stages from
- // the leaf nodes (the result stages of active jobs), and
rollback all the stages
-@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler(
++ val stagesToRollback = collectSucceedingStages(mapStage,
isCelebornShuffleIndeterminate)
+ val rollingBackStages =
abortStageWithInvalidRollBack(stagesToRollback)
+ logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID,
mapStage)} with indeterminate output was failed, " +
+ log"we will roll back and rerun below stages which include
itself and all its " +
+@@ -2314,7 +2338,7 @@ private[spark] class DAGScheduler(
+ }
+ }
+
+- private def collectSucceedingStages(mapStage: ShuffleMapStage):
HashSet[Stage] = {
++ private def collectSucceedingStages(mapStage: ShuffleMapStage,
isCelebornShuffleIndeterminate: Boolean): HashSet[Stage] = {
+ // TODO: perhaps materialize this if we are going to compute it often
enough ?
+ // It's a little tricky to find all the succeeding stages of `mapStage`,
because
+ // each stage only know its parents not children. Here we traverse the
stages from
+@@ -2326,7 +2350,17 @@ private[spark] class DAGScheduler(
- def collectStagesToRollback(stageChain: List[Stage]): Unit = {
- if (stagesToRollback.contains(stageChain.head)) {
-- stageChain.drop(1).foreach(s => stagesToRollback += s)
-+ stageChain.drop(1).foreach(s => {
-+ stagesToRollback += s
-+ if (isCelebornShuffleIndeterminate) {
-+ s match {
-+ case currentMapStage: ShuffleMapStage =>
-+
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
-+ case _: ResultStage =>
-+ // do nothing, should abort celeborn skewed read
stage
-+ }
-+ }
-+ })
- } else {
- stageChain.head.parents.foreach { s =>
- collectStagesToRollback(s :: stageChain)
+ def collectSucceedingStagesInternal(stageChain: List[Stage]): Unit = {
+ if (succeedingStages.contains(stageChain.head)) {
+- stageChain.drop(1).foreach(s => succeedingStages += s)
++ stageChain.drop(1).foreach(s => {
++ succeedingStages += s
++ if (isCelebornShuffleIndeterminate) {
++ s match {
++ case currentMapStage: ShuffleMapStage =>
++
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
++ case _: ResultStage =>
++ // do nothing, should abort celeborn skewed read stage
++ }
++ }
++ })
+ } else {
+ stageChain.head.parents.foreach { s =>
+ collectSucceedingStagesInternal(s :: stageChain)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala
new file mode 100644
index 00000000000..3dc60678461
@@ -306,10 +326,10 @@ index abd096b9c7c..ff0363f87d8 100644
if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) {
shuffle
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
-index 37cdea084d8..4694a06919e 100644
+index c256b3fcb6b..c366ff346e2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
-@@ -152,8 +152,10 @@ case class OptimizeSkewedJoin(ensureRequirements:
EnsureRequirements)
+@@ -151,8 +151,10 @@ case class OptimizeSkewedJoin(ensureRequirements:
EnsureRequirements)
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1,
rightSize))
val leftParts = if (isLeftSkew) {
@@ -320,8 +340,8 @@ index 37cdea084d8..4694a06919e 100644
+ isCelebornShuffle = isCelebornShuffle)
if (skewSpecs.isDefined) {
logDebug(s"Left side partition $partitionIndex " +
- s"(${FileUtils.byteCountToDisplaySize(leftSize)}) is skewed, " +
-@@ -166,8 +168,10 @@ case class OptimizeSkewedJoin(ensureRequirements:
EnsureRequirements)
+ s"(${Utils.bytesToString(leftSize)}) is skewed, " +
+@@ -165,8 +167,10 @@ case class OptimizeSkewedJoin(ensureRequirements:
EnsureRequirements)
}
val rightParts = if (isRightSkew) {
@@ -332,9 +352,9 @@ index 37cdea084d8..4694a06919e 100644
+ isCelebornShuffle = isCelebornShuffle)
if (skewSpecs.isDefined) {
logDebug(s"Right side partition $partitionIndex " +
- s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " +
+ s"(${Utils.bytesToString(rightSize)}) is skewed, " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
-index 9370b3d8d1d..d36e26a1376 100644
+index 1ea4df02546..22bc98f4f86 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive
@@ -342,10 +362,10 @@ index 9370b3d8d1d..d36e26a1376 100644
import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster,
SparkEnv}
+import org.apache.spark.celeborn.CelebornShuffleState
- import org.apache.spark.internal.Logging
+ import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.{CoalescedPartitionSpec,
PartialReducerPartitionSpec, ShufflePartitionSpec}
-@@ -382,13 +383,21 @@ object ShufflePartitionsUtil extends Logging {
+@@ -384,13 +385,21 @@ object ShufflePartitionsUtil extends Logging {
shuffleId: Int,
reducerId: Int,
targetSize: Long,
@@ -363,13 +383,13 @@ index 9370b3d8d1d..d36e26a1376 100644
+
+ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
+ if (stageRerunEnabled &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
-+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is
skewed")
++ logInfo(log"Celeborn shuffle retry enabled and shuffle
${MDC(LogKeys.SHUFFLE_ID, shuffleId)} is skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
Some(mapStartIndices.indices.map { i =>
val startMapIndex = mapStartIndices(i)
val endMapIndex = if (i == mapStartIndices.length - 1) {
-@@ -402,7 +411,15 @@ object ShufflePartitionsUtil extends Logging {
+@@ -404,7 +413,15 @@ object ShufflePartitionsUtil extends Logging {
dataSize += mapPartitionSizes(mapIndex)
mapIndex += 1
}