This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new edeeb4b30 [CELEBORN-1719][FOLLOWUP] Rename throwsFetchFailure to 
stageRerunEnabled
edeeb4b30 is described below

commit edeeb4b30a2f0fc31b1a2298410ca5ef8144a3e2
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Jun 11 19:33:19 2025 +0800

    [CELEBORN-1719][FOLLOWUP] Rename throwsFetchFailure to stageRerunEnabled
    
    ### What changes were proposed in this pull request?
    Rename throwsFetchFailure to stageRerunEnabled
    
    ### Why are the changes needed?
    Make the code cleaner.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    existing UTs.
    
    Closes #3324 from leixm/CELEBORN-2035.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch  | 4 ++--
 .../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch  | 4 ++--
 .../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch  | 4 ++--
 .../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch  | 4 ++--
 .../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java    | 2 +-
 .../main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java   | 2 +-
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala | 6 +++---
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 6 +++---
 .../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java    | 2 +-
 .../main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java   | 2 +-
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala | 6 +++---
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 8 ++++----
 .../java/org/apache/celeborn/client/read/CelebornInputStream.java | 2 +-
 docs/migration.md                                                 | 2 ++
 .../apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala   | 8 +++++---
 .../scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala | 6 +++---
 16 files changed, 36 insertions(+), 32 deletions(-)

diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
index 509105cf1..f02902d64 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
@@ -345,8 +345,8 @@ index 3609548f374..d34f43bf064 100644
 +      val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
 +        
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && 
isCelebornShuffle
 +
-+      val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+      if (throwsFetchFailure && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++      val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++      if (stageRerunEnabled && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
 +        logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is 
skewed")
 +        CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
 +      }
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
index 44c3f8a97..2b0aecf0b 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
@@ -349,8 +349,8 @@ index af689db3379..39d0b3132ee 100644
 +      val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
 +        
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && 
isCelebornShuffle
 +
-+      val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+      if (throwsFetchFailure && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++      val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++      if (stageRerunEnabled && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
 +        logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is 
skewed")
 +        CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
 +      }
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
index 27f7f4188..03b26e07a 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
@@ -349,8 +349,8 @@ index dbed66683b0..d656c8af6b7 100644
 +      val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
 +        
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && 
isCelebornShuffle
 +
-+      val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+      if (throwsFetchFailure && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++      val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++      if (stageRerunEnabled && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
 +        logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is 
skewed")
 +        CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
 +      }
diff --git 
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch 
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
index d49a6c2c4..b9fc88b79 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
@@ -361,8 +361,8 @@ index 9370b3d8d1d..d36e26a1376 100644
 +      val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
 +        
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && 
isCelebornShuffle
 +
-+      val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+      if (throwsFetchFailure && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++      val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++      if (stageRerunEnabled && 
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
 +        logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is 
skewed")
 +        CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
 +      }
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 97c393f3b..563f00b91 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -209,7 +209,7 @@ public class SparkShuffleManager implements ShuffleManager {
                 celebornConf,
                 h.userIdentifier(),
                 h.extension());
-        if (h.throwsFetchFailure()) {
+        if (h.stageRerunEnabled()) {
           SparkUtils.addFailureListenerIfBarrierTask(client, context, h);
         }
         int shuffleId = SparkUtils.celebornShuffleId(client, h, context, true);
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index cff05deb2..9629fabfa 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -160,7 +160,7 @@ public class SparkUtils {
       CelebornShuffleHandle<?, ?, ?> handle,
       TaskContext context,
       Boolean isWriter) {
-    if (handle.throwsFetchFailure()) {
+    if (handle.stageRerunEnabled()) {
       String appShuffleIdentifier = 
getAppShuffleIdentifier(handle.shuffleId(), context);
       Tuple2<Integer, Boolean> res =
           client.getShuffleId(
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
index 4ae52720c..6b925c21f 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
@@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C](
     val lifecycleManagerPort: Int,
     val userIdentifier: UserIdentifier,
     shuffleId: Int,
-    val throwsFetchFailure: Boolean,
+    val stageRerunEnabled: Boolean,
     numMappers: Int,
     dependency: ShuffleDependency[K, V, C],
     val extension: Array[Byte])
@@ -39,7 +39,7 @@ class CelebornShuffleHandle[K, V, C](
       lifecycleManagerPort: Int,
       userIdentifier: UserIdentifier,
       shuffleId: Int,
-      throwsFetchFailure: Boolean,
+      stageRerunEnabled: Boolean,
       numMappers: Int,
       dependency: ShuffleDependency[K, V, C]) = this(
     appUniqueId,
@@ -47,7 +47,7 @@ class CelebornShuffleHandle[K, V, C](
     lifecycleManagerPort,
     userIdentifier,
     shuffleId,
-    throwsFetchFailure,
+    stageRerunEnabled,
     numMappers,
     dependency,
     null)
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index df63a94b1..2372305c5 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -69,7 +69,7 @@ class CelebornShuffleReader[K, C](
       } catch {
         case e: CelebornRuntimeException =>
           logError(s"Failed to get shuffleId for appShuffleId 
${handle.shuffleId}", e)
-          if (handle.throwsFetchFailure) {
+          if (handle.stageRerunEnabled) {
             throw new FetchFailedException(
               null,
               handle.shuffleId,
@@ -142,7 +142,7 @@ class CelebornShuffleReader[K, C](
           if (exceptionRef.get() != null) {
             exceptionRef.get() match {
               case ce @ (_: CelebornIOException | _: 
PartitionUnRetryAbleException) =>
-                if (handle.throwsFetchFailure &&
+                if (handle.stageRerunEnabled &&
                   shuffleClient.reportShuffleFetchFailure(
                     handle.shuffleId,
                     shuffleId,
@@ -179,7 +179,7 @@ class CelebornShuffleReader[K, C](
         iter
       } catch {
         case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
-          if (handle.throwsFetchFailure &&
+          if (handle.stageRerunEnabled &&
             shuffleClient.reportShuffleFetchFailure(
               handle.shuffleId,
               shuffleId,
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 151ce6e41..b6555aa61 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -288,7 +288,7 @@ public class SparkShuffleManager implements ShuffleManager {
                 celebornConf,
                 h.userIdentifier(),
                 h.extension());
-        if (h.throwsFetchFailure()) {
+        if (h.stageRerunEnabled()) {
           SparkUtils.addFailureListenerIfBarrierTask(shuffleClient, context, 
h);
         }
         int shuffleId = SparkUtils.celebornShuffleId(shuffleClient, h, 
context, true);
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 4a5bfdbb7..c5dfb81b1 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -133,7 +133,7 @@ public class SparkUtils {
       CelebornShuffleHandle<?, ?, ?> handle,
       TaskContext context,
       Boolean isWriter) {
-    if (handle.throwsFetchFailure()) {
+    if (handle.stageRerunEnabled()) {
       String appShuffleIdentifier =
           SparkCommonUtils.encodeAppShuffleIdentifier(handle.shuffleId(), 
context);
       Tuple2<Integer, Boolean> res =
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
index 3d0180a26..46f2ef8a4 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
@@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C](
     val lifecycleManagerPort: Int,
     val userIdentifier: UserIdentifier,
     shuffleId: Int,
-    val throwsFetchFailure: Boolean,
+    val stageRerunEnabled: Boolean,
     val numMappers: Int,
     dependency: ShuffleDependency[K, V, C],
     val extension: Array[Byte])
@@ -39,7 +39,7 @@ class CelebornShuffleHandle[K, V, C](
       lifecycleManagerPort: Int,
       userIdentifier: UserIdentifier,
       shuffleId: Int,
-      throwsFetchFailure: Boolean,
+      stageRerunEnabled: Boolean,
       numMappers: Int,
       dependency: ShuffleDependency[K, V, C]) = this(
     appUniqueId,
@@ -47,7 +47,7 @@ class CelebornShuffleHandle[K, V, C](
     lifecycleManagerPort,
     userIdentifier,
     shuffleId,
-    throwsFetchFailure,
+    stageRerunEnabled,
     numMappers,
     dependency,
     null)
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index d202d3779..772576617 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -94,7 +94,7 @@ class CelebornShuffleReader[K, C](
     handle.extension)
 
   private val exceptionRef = new AtomicReference[IOException]
-  private val throwsFetchFailure = handle.throwsFetchFailure
+  private val stageRerunEnabled = handle.stageRerunEnabled
   private val encodedAttemptId = 
SparkCommonUtils.getEncodedAttemptNumber(context)
 
   override def read(): Iterator[Product2[K, C]] = {
@@ -107,7 +107,7 @@ class CelebornShuffleReader[K, C](
       } catch {
         case e: CelebornRuntimeException =>
           logError(s"Failed to get shuffleId for appShuffleId 
${handle.shuffleId}", e)
-          if (throwsFetchFailure) {
+          if (stageRerunEnabled) {
             throw new FetchFailedException(
               null,
               handle.shuffleId,
@@ -329,7 +329,7 @@ class CelebornShuffleReader[K, C](
             context.taskAttemptId(),
             startMapIndex,
             endMapIndex,
-            if (throwsFetchFailure) 
ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER
+            if (stageRerunEnabled) 
ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER
             else null,
             locationList,
             streamHandlers,
@@ -513,7 +513,7 @@ class CelebornShuffleReader[K, C](
       shuffleId: Int,
       partitionId: Int,
       ce: Throwable) = {
-    if (throwsFetchFailure &&
+    if (stageRerunEnabled &&
       shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, 
context.taskAttemptId())) {
       logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}", 
ce)
       throw new FetchFailedException(
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 5b6f8ef70..ad91fc381 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -853,7 +853,7 @@ public abstract class CelebornInputStream extends 
InputStream {
         if (exceptionMaker != null) {
           if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, 
taskId)) {
             /*
-             * [[ExceptionMaker.makeException]], for spark applications with 
celeborn.client.spark.fetch.throwsFetchFailure enabled will result in creating
+             * [[ExceptionMaker.makeException]], for spark applications with 
celeborn.client.spark.stageRerun.enabled enabled will result in creating
              * a FetchFailedException; and that will make the TaskContext as 
failed with shuffle fetch issues - see SPARK-19276 for more.
              * Given this, Celeborn can wrap the FetchFailedException with our 
CelebornIOException
              */
diff --git a/docs/migration.md b/docs/migration.md
index b3105e3e1..a88904701 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -23,6 +23,8 @@ license: |
 
 # Upgrading from 0.5 to 0.6
 
+- Since 0.6.0, Celeborn deprecate 
`celeborn.client.spark.fetch.throwsFetchFailure`. Please use 
`celeborn.client.spark.stageRerun.enabled` instead.
+- 
 - Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to 
`celeborn.quota.user.diskBytesWritten`. Please use 
`celeborn.quota.user.diskBytesWritten` if you want to set user level quota.
 
 - Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to 
`celeborn.quota.user.diskFileCount`. Please use 
`celeborn.quota.user.diskFileCount` if you want to set user level quota.
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
index 9db3912a7..58cfe2759 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
@@ -85,7 +85,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
     }
   }
 
-  test("celeborn spark integration test - unregister shuffle with 
throwsFetchFailure disabled") {
+  test("celeborn spark integration test - unregister shuffle with stageRerun 
disabled") {
     if (Spark3OrNewer) {
       val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2,3]")
       val sparkSession = SparkSession.builder()
@@ -241,7 +241,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite
     }
   }
 
-  test(s"celeborn spark integration test - resubmit an unordered barrier stage 
with throwsFetchFailure enabled") {
+  test(
+    s"celeborn spark integration test - resubmit an unordered barrier stage 
with stageRerun enabled") {
     val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2]")
     val sparkSession = SparkSession.builder()
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
@@ -285,7 +286,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite
     }
   }
 
-  test(s"celeborn spark integration test - fetch failure in child of an 
unordered barrier stage with throwsFetchFailure enabled") {
+  test(s"celeborn spark integration test - fetch failure in child of an 
unordered " +
+    s"barrier stage with stageRerun enabled") {
     val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2]")
     val sparkSession = SparkSession.builder()
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
diff --git 
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
index d4e57a47c..58ca1b011 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
@@ -54,7 +54,7 @@ class SparkUtilsSuite extends AnyFunSuite
         .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
         .config("spark.sql.shuffle.partitions", 2)
         .config("spark.celeborn.shuffle.forceFallback.partition.enabled", 
false)
-        .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+        .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
         .config(
           "spark.shuffle.manager",
           "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -115,7 +115,7 @@ class SparkUtilsSuite extends AnyFunSuite
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
       .config("spark.sql.shuffle.partitions", 2)
       .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
-      .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+      .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
       .config(
         "spark.shuffle.manager",
         "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -167,7 +167,7 @@ class SparkUtilsSuite extends AnyFunSuite
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
       .config("spark.sql.shuffle.partitions", 2)
       .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
-      .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+      .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
       .config(
         "spark.shuffle.manager",
         "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")

Reply via email to