This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 3199111d6 [CELEBORN-1719][FOLLOWUP] Rename throwsFetchFailure to
stageRerunEnabled
3199111d6 is described below
commit 3199111d62d25e25751a9f0043107887952d567a
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Jun 11 19:33:19 2025 +0800
[CELEBORN-1719][FOLLOWUP] Rename throwsFetchFailure to stageRerunEnabled
### What changes were proposed in this pull request?
Rename throwsFetchFailure to stageRerunEnabled
### Why are the changes needed?
Make the code cleaner.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
existing UTs.
Closes #3324 from leixm/CELEBORN-2035.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit edeeb4b30a2f0fc31b1a2298410ca5ef8144a3e2)
Signed-off-by: SteNicholas <[email protected]>
---
.../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch | 4 ++--
.../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch | 4 ++--
.../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch | 4 ++--
.../spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch | 4 ++--
.../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | 2 +-
.../main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 2 +-
.../org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala | 6 +++---
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 6 +++---
.../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | 2 +-
.../main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 2 +-
.../org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala | 6 +++---
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 8 ++++----
.../java/org/apache/celeborn/client/read/CelebornInputStream.java | 2 +-
docs/migration.md | 2 ++
.../apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala | 8 +++++---
.../scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala | 6 +++---
16 files changed, 36 insertions(+), 32 deletions(-)
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
index 509105cf1..f02902d64 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch
@@ -345,8 +345,8 @@ index 3609548f374..d34f43bf064 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled &&
isCelebornShuffle
+
-+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+ if (throwsFetchFailure &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++ if (stageRerunEnabled &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is
skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
index 44c3f8a97..2b0aecf0b 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch
@@ -349,8 +349,8 @@ index af689db3379..39d0b3132ee 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled &&
isCelebornShuffle
+
-+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+ if (throwsFetchFailure &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++ if (stageRerunEnabled &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is
skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
index 27f7f4188..03b26e07a 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch
@@ -349,8 +349,8 @@ index dbed66683b0..d656c8af6b7 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled &&
isCelebornShuffle
+
-+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+ if (throwsFetchFailure &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++ if (stageRerunEnabled &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is
skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
diff --git
a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
index d49a6c2c4..b9fc88b79 100644
--- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
+++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch
@@ -361,8 +361,8 @@ index 9370b3d8d1d..d36e26a1376 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+
CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled &&
isCelebornShuffle
+
-+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
-+ if (throwsFetchFailure &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
++ if (stageRerunEnabled &&
celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is
skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 97c393f3b..563f00b91 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -209,7 +209,7 @@ public class SparkShuffleManager implements ShuffleManager {
celebornConf,
h.userIdentifier(),
h.extension());
- if (h.throwsFetchFailure()) {
+ if (h.stageRerunEnabled()) {
SparkUtils.addFailureListenerIfBarrierTask(client, context, h);
}
int shuffleId = SparkUtils.celebornShuffleId(client, h, context, true);
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index cff05deb2..9629fabfa 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -160,7 +160,7 @@ public class SparkUtils {
CelebornShuffleHandle<?, ?, ?> handle,
TaskContext context,
Boolean isWriter) {
- if (handle.throwsFetchFailure()) {
+ if (handle.stageRerunEnabled()) {
String appShuffleIdentifier =
getAppShuffleIdentifier(handle.shuffleId(), context);
Tuple2<Integer, Boolean> res =
client.getShuffleId(
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
index 4ae52720c..6b925c21f 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
@@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C](
val lifecycleManagerPort: Int,
val userIdentifier: UserIdentifier,
shuffleId: Int,
- val throwsFetchFailure: Boolean,
+ val stageRerunEnabled: Boolean,
numMappers: Int,
dependency: ShuffleDependency[K, V, C],
val extension: Array[Byte])
@@ -39,7 +39,7 @@ class CelebornShuffleHandle[K, V, C](
lifecycleManagerPort: Int,
userIdentifier: UserIdentifier,
shuffleId: Int,
- throwsFetchFailure: Boolean,
+ stageRerunEnabled: Boolean,
numMappers: Int,
dependency: ShuffleDependency[K, V, C]) = this(
appUniqueId,
@@ -47,7 +47,7 @@ class CelebornShuffleHandle[K, V, C](
lifecycleManagerPort,
userIdentifier,
shuffleId,
- throwsFetchFailure,
+ stageRerunEnabled,
numMappers,
dependency,
null)
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index df63a94b1..2372305c5 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -69,7 +69,7 @@ class CelebornShuffleReader[K, C](
} catch {
case e: CelebornRuntimeException =>
logError(s"Failed to get shuffleId for appShuffleId
${handle.shuffleId}", e)
- if (handle.throwsFetchFailure) {
+ if (handle.stageRerunEnabled) {
throw new FetchFailedException(
null,
handle.shuffleId,
@@ -142,7 +142,7 @@ class CelebornShuffleReader[K, C](
if (exceptionRef.get() != null) {
exceptionRef.get() match {
case ce @ (_: CelebornIOException | _:
PartitionUnRetryAbleException) =>
- if (handle.throwsFetchFailure &&
+ if (handle.stageRerunEnabled &&
shuffleClient.reportShuffleFetchFailure(
handle.shuffleId,
shuffleId,
@@ -179,7 +179,7 @@ class CelebornShuffleReader[K, C](
iter
} catch {
case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
- if (handle.throwsFetchFailure &&
+ if (handle.stageRerunEnabled &&
shuffleClient.reportShuffleFetchFailure(
handle.shuffleId,
shuffleId,
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 151ce6e41..b6555aa61 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -288,7 +288,7 @@ public class SparkShuffleManager implements ShuffleManager {
celebornConf,
h.userIdentifier(),
h.extension());
- if (h.throwsFetchFailure()) {
+ if (h.stageRerunEnabled()) {
SparkUtils.addFailureListenerIfBarrierTask(shuffleClient, context,
h);
}
int shuffleId = SparkUtils.celebornShuffleId(shuffleClient, h,
context, true);
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 4a5bfdbb7..c5dfb81b1 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -133,7 +133,7 @@ public class SparkUtils {
CelebornShuffleHandle<?, ?, ?> handle,
TaskContext context,
Boolean isWriter) {
- if (handle.throwsFetchFailure()) {
+ if (handle.stageRerunEnabled()) {
String appShuffleIdentifier =
SparkCommonUtils.encodeAppShuffleIdentifier(handle.shuffleId(),
context);
Tuple2<Integer, Boolean> res =
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
index 3d0180a26..46f2ef8a4 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala
@@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C](
val lifecycleManagerPort: Int,
val userIdentifier: UserIdentifier,
shuffleId: Int,
- val throwsFetchFailure: Boolean,
+ val stageRerunEnabled: Boolean,
val numMappers: Int,
dependency: ShuffleDependency[K, V, C],
val extension: Array[Byte])
@@ -39,7 +39,7 @@ class CelebornShuffleHandle[K, V, C](
lifecycleManagerPort: Int,
userIdentifier: UserIdentifier,
shuffleId: Int,
- throwsFetchFailure: Boolean,
+ stageRerunEnabled: Boolean,
numMappers: Int,
dependency: ShuffleDependency[K, V, C]) = this(
appUniqueId,
@@ -47,7 +47,7 @@ class CelebornShuffleHandle[K, V, C](
lifecycleManagerPort,
userIdentifier,
shuffleId,
- throwsFetchFailure,
+ stageRerunEnabled,
numMappers,
dependency,
null)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index d202d3779..772576617 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -94,7 +94,7 @@ class CelebornShuffleReader[K, C](
handle.extension)
private val exceptionRef = new AtomicReference[IOException]
- private val throwsFetchFailure = handle.throwsFetchFailure
+ private val stageRerunEnabled = handle.stageRerunEnabled
private val encodedAttemptId =
SparkCommonUtils.getEncodedAttemptNumber(context)
override def read(): Iterator[Product2[K, C]] = {
@@ -107,7 +107,7 @@ class CelebornShuffleReader[K, C](
} catch {
case e: CelebornRuntimeException =>
logError(s"Failed to get shuffleId for appShuffleId
${handle.shuffleId}", e)
- if (throwsFetchFailure) {
+ if (stageRerunEnabled) {
throw new FetchFailedException(
null,
handle.shuffleId,
@@ -329,7 +329,7 @@ class CelebornShuffleReader[K, C](
context.taskAttemptId(),
startMapIndex,
endMapIndex,
- if (throwsFetchFailure)
ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER
+ if (stageRerunEnabled)
ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER
else null,
locationList,
streamHandlers,
@@ -513,7 +513,7 @@ class CelebornShuffleReader[K, C](
shuffleId: Int,
partitionId: Int,
ce: Throwable) = {
- if (throwsFetchFailure &&
+ if (stageRerunEnabled &&
shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId,
context.taskAttemptId())) {
logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}",
ce)
throw new FetchFailedException(
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 5b6f8ef70..ad91fc381 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -853,7 +853,7 @@ public abstract class CelebornInputStream extends
InputStream {
if (exceptionMaker != null) {
if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId,
taskId)) {
/*
- * [[ExceptionMaker.makeException]], for spark applications with
celeborn.client.spark.fetch.throwsFetchFailure enabled will result in creating
+ * [[ExceptionMaker.makeException]], for spark applications with
celeborn.client.spark.stageRerun.enabled enabled will result in creating
* a FetchFailedException; and that will make the TaskContext as
failed with shuffle fetch issues - see SPARK-19276 for more.
* Given this, Celeborn can wrap the FetchFailedException with our
CelebornIOException
*/
diff --git a/docs/migration.md b/docs/migration.md
index b3105e3e1..a88904701 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -23,6 +23,8 @@ license: |
# Upgrading from 0.5 to 0.6
+- Since 0.6.0, Celeborn deprecate
`celeborn.client.spark.fetch.throwsFetchFailure`. Please use
`celeborn.client.spark.stageRerun.enabled` instead.
+-
- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to
`celeborn.quota.user.diskBytesWritten`. Please use
`celeborn.quota.user.diskBytesWritten` if you want to set user level quota.
- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to
`celeborn.quota.user.diskFileCount`. Please use
`celeborn.quota.user.diskFileCount` if you want to set user level quota.
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
index 9db3912a7..58cfe2759 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
@@ -85,7 +85,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
}
}
- test("celeborn spark integration test - unregister shuffle with
throwsFetchFailure disabled") {
+ test("celeborn spark integration test - unregister shuffle with stageRerun
disabled") {
if (Spark3OrNewer) {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2,3]")
val sparkSession = SparkSession.builder()
@@ -241,7 +241,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite
}
}
- test(s"celeborn spark integration test - resubmit an unordered barrier stage
with throwsFetchFailure enabled") {
+ test(
+ s"celeborn spark integration test - resubmit an unordered barrier stage
with stageRerun enabled") {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2]")
val sparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
@@ -285,7 +286,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite
}
}
- test(s"celeborn spark integration test - fetch failure in child of an
unordered barrier stage with throwsFetchFailure enabled") {
+ test(s"celeborn spark integration test - fetch failure in child of an
unordered " +
+ s"barrier stage with stageRerun enabled") {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2]")
val sparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
diff --git
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
index d4e57a47c..58ca1b011 100644
---
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
@@ -54,7 +54,7 @@ class SparkUtilsSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled",
false)
- .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+ .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -115,7 +115,7 @@ class SparkUtilsSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
- .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+ .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -167,7 +167,7 @@ class SparkUtilsSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
- .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+ .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")