This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d0032b92 [CELEBORN-1719] Introduce 
celeborn.client.spark.stageRerun.enabled with alternative 
celeborn.client.spark.fetch.throwsFetchFailure to enable spark stage rerun
1d0032b92 is described below

commit 1d0032b925f4872d6158514a715f4dde08a02f69
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 20 19:30:26 2024 +0800

    [CELEBORN-1719] Introduce celeborn.client.spark.stageRerun.enabled with 
alternative celeborn.client.spark.fetch.throwsFetchFailure to enable spark 
stage rerun
    
    ### What changes were proposed in this pull request?
    
    1. Introduce `celeborn.client.spark.stageRerun.enabled` with alternative 
`celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
    2. Change the default value of 
`celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which 
enables spark stage rerun at default.
    
    ### Why are the changes needed?
    
    User could not directly understand the meaning of 
`celeborn.client.spark.fetch.throwsFetchFailure` as whether to enable stage 
rerun, which means that client throws `FetchFailedException` instead of 
`CelebornIOException`. It's recommended to introduce 
`celeborn.client.spark.stageRerun.enabled` with alternative 
`celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #2920 from SteNicholas/CELEBORN-1719.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../spark/shuffle/celeborn/SparkShuffleManager.java    |  7 +++----
 .../spark/shuffle/celeborn/SparkShuffleManager.java    |  7 +++----
 .../org/apache/celeborn/common/CelebornConf.scala      | 11 ++++++-----
 docs/configuration/client.md                           |  2 +-
 docs/migration.md                                      |  2 ++
 .../tests/spark/CelebornFetchFailureSuite.scala        | 18 +++++++++---------
 .../tests/spark/CelebornShuffleLostSuite.scala         |  2 +-
 7 files changed, 25 insertions(+), 24 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index d450071f4..ce0093bd3 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -98,7 +98,7 @@ public class SparkShuffleManager implements ShuffleManager {
           appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
           lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
           
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
-          if (celebornConf.clientFetchThrowsFetchFailure()) {
+          if (celebornConf.clientStageRerunEnabled()) {
             MapOutputTrackerMaster mapOutputTracker =
                 (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
             lifecycleManager.registerShuffleTrackerCallback(
@@ -135,7 +135,7 @@ public class SparkShuffleManager implements ShuffleManager {
           lifecycleManager.getPort(),
           lifecycleManager.getUserIdentifier(),
           shuffleId,
-          celebornConf.clientFetchThrowsFetchFailure(),
+          celebornConf.clientStageRerunEnabled(),
           numMaps,
           dependency);
     }
@@ -148,8 +148,7 @@ public class SparkShuffleManager implements ShuffleManager {
     }
     // For Spark driver side trigger unregister shuffle.
     if (lifecycleManager != null) {
-      lifecycleManager.unregisterAppShuffle(
-          appShuffleId, celebornConf.clientFetchThrowsFetchFailure());
+      lifecycleManager.unregisterAppShuffle(appShuffleId, 
celebornConf.clientStageRerunEnabled());
     }
     // For Spark executor side cleanup shuffle related info.
     if (shuffleClient != null) {
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 8541ad223..af3c400ec 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -140,7 +140,7 @@ public class SparkShuffleManager implements ShuffleManager {
           appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
           lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
           
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
-          if (celebornConf.clientFetchThrowsFetchFailure()) {
+          if (celebornConf.clientStageRerunEnabled()) {
             MapOutputTrackerMaster mapOutputTracker =
                 (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
 
@@ -187,7 +187,7 @@ public class SparkShuffleManager implements ShuffleManager {
           lifecycleManager.getPort(),
           lifecycleManager.getUserIdentifier(),
           shuffleId,
-          celebornConf.clientFetchThrowsFetchFailure(),
+          celebornConf.clientStageRerunEnabled(),
           dependency.rdd().getNumPartitions(),
           dependency);
     }
@@ -200,8 +200,7 @@ public class SparkShuffleManager implements ShuffleManager {
     }
     // For Spark driver side trigger unregister shuffle.
     if (lifecycleManager != null) {
-      lifecycleManager.unregisterAppShuffle(
-          appShuffleId, celebornConf.clientFetchThrowsFetchFailure());
+      lifecycleManager.unregisterAppShuffle(appShuffleId, 
celebornConf.clientStageRerunEnabled());
     }
     // For Spark executor side cleanup shuffle related info.
     if (shuffleClient != null) {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 32b718dc5..9bf3ea4d4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -973,7 +973,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt
   def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
   def clientFetchMaxRetriesForEachReplica: Int = 
get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
-  def clientFetchThrowsFetchFailure: Boolean = 
get(CLIENT_FETCH_THROWS_FETCH_FAILURE)
+  def clientStageRerunEnabled: Boolean = get(CLIENT_STAGE_RERUN_ENABLED)
   def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
     get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
   def clientFetchExcludedWorkerExpireTimeout: Long =
@@ -4586,13 +4586,14 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(3)
 
-  val CLIENT_FETCH_THROWS_FETCH_FAILURE: ConfigEntry[Boolean] =
-    buildConf("celeborn.client.spark.fetch.throwsFetchFailure")
+  val CLIENT_STAGE_RERUN_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.spark.stageRerun.enabled")
+      .withAlternative("celeborn.client.spark.fetch.throwsFetchFailure")
       .categories("client")
       .version("0.4.0")
-      .doc("client throws FetchFailedException instead of CelebornIOException")
+      .doc("Whether to enable stage rerun. If true, client throws 
FetchFailedException instead of CelebornIOException.")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index f8b075f80..bec9c56ae 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -107,7 +107,6 @@ license: |
 | celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | 
false | Whether to filter excluded worker when register shuffle. | 0.4.0 |  | 
 | celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether 
to revise lost shuffles. | 0.6.0 |  | 
 | celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that 
slots of one shuffle can be allocated on. Will choose the smaller positive one 
from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. 
| 0.3.1 |  | 
-| celeborn.client.spark.fetch.throwsFetchFailure | false | false | client 
throws FetchFailedException instead of CelebornIOException | 0.4.0 |  | 
 | celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | 
Whether to dynamically switch push write mode based on conditions.If true, 
shuffle mode will be only determined by partition count | 0.5.0 |  | 
 | celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | 
false | Threshold of shuffle partition number for dynamically switching push 
writer mode. When the shuffle partition number is greater than this value, use 
the sort-based shuffle writer for memory efficiency; otherwise use the 
hash-based shuffle writer for speed. This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 |  | 
 | celeborn.client.spark.push.sort.memory.maxMemoryFactor | 0.4 | false | the 
max portion of executor memory which can be used for SortBasedWriter buffer 
(only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is 
enabled | 0.5.0 |  | 
@@ -120,6 +119,7 @@ license: |
 | celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn 
supports the following kind of fallback policies. 1. ALWAYS: always use spark 
built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle 
implementation, and fallback to use spark built-in shuffle implementation based 
on certain factors, e.g. availability of enough workers and quota, shuffle 
partition number; 3. NEVER: always use celeborn shuffle implementation, and 
fail fast when it it is concluded th [...]
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always 
use spark built-in shuffle implementation. This configuration is deprecated, 
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 
0.3.0 | celeborn.shuffle.forceFallback.enabled | 
 | celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. 
This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | 
celeborn.shuffle.writer | 
+| celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable 
stage rerun. If true, client throws FetchFailedException instead of 
CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure | 
 | celeborn.client.tagsExpr |  | false | Expression to filter workers by tags. 
The expression is a comma-separated list of tags. The expression is evaluated 
as a logical AND of all tags. For example, `prod,high-io` filters workers that 
have both the `prod` and `high-io` tags. | 0.6.0 |  | 
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of 
master nodes for celeborn clients to connect. Client uses resolver provided by 
celeborn.master.endpoints.resolver to resolve the master endpoints. By default 
Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver` 
which take static master endpoints as input. Allowed pattern: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used [...]
 | celeborn.master.endpoints.resolver | 
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | 
Resolver class that can be used for discovering and updating the master 
endpoints. This allows users to provide a custom master endpoint resolver 
implementation. This is useful in environments where the master nodes might 
change due to scaling operations or infrastructure updates. Clients need to 
ensure that provided resolver class should be present in the classpath. | 0.6.0 
|  | 
diff --git a/docs/migration.md b/docs/migration.md
index 13a4592b1..728e63a41 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -22,10 +22,12 @@ license: |
 # Migration Guide
 
 # Upgrading from 0.5 to 0.6
+
 - Since 0.6.0, Celeborn deprecate 
`celeborn.worker.congestionControl.low.watermark`. Please use 
`celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.
 
 - Since 0.6.0, Celeborn deprecate 
`celeborn.worker.congestionControl.high.watermark`. Please use 
`celeborn.worker.congestionControl.diskBuffer.high.watermark` instead.
 
+- Since 0.6.0, Celeborn changed the default value of 
`celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which 
means Celeborn will enable spark stage rerun at default.
 
 - Since 0.6.0, Celeborn has introduced a new RESTful API namespace: /api/v1, 
which uses the application/json media type for requests and responses.
    The `celeborn-openapi-client` SDK is also available to help users interact 
with the new RESTful APIs.
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
index f3cd38211..1703ad0b8 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
@@ -104,7 +104,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
         .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
         .config("spark.sql.shuffle.partitions", 2)
         .config("spark.celeborn.shuffle.forceFallback.partition.enabled", 
false)
-        .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+        .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
         .config(
           "spark.shuffle.manager",
           "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -145,7 +145,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
         .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
         .config("spark.sql.shuffle.partitions", 2)
         .config("spark.celeborn.shuffle.forceFallback.partition.enabled", 
false)
-        .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", 
"false")
+        .config("spark.celeborn.client.spark.stageRerun.enabled", "false")
         .getOrCreate()
 
       val value = Range(1, 10000).mkString(",")
@@ -177,7 +177,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
         .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
         .config("spark.sql.shuffle.partitions", 2)
         .config("spark.celeborn.shuffle.forceFallback.partition.enabled", 
false)
-        .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+        .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
         .config(
           "spark.shuffle.manager",
           "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -208,7 +208,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
         .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
         .config("spark.sql.shuffle.partitions", 2)
         .config("spark.celeborn.shuffle.forceFallback.partition.enabled", 
false)
-        .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+        .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
         .config(
           "spark.shuffle.manager",
           "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -248,7 +248,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
         .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
         .config("spark.sql.shuffle.partitions", 2)
         .config("spark.celeborn.shuffle.forceFallback.partition.enabled", 
false)
-        .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+        .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
         .config(
           "spark.shuffle.manager",
           "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
@@ -279,7 +279,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
         .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
         .config("spark.sql.shuffle.partitions", 2)
         .config("spark.celeborn.shuffle.forceFallback.partition.enabled", 
false)
-        .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+        .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
         .getOrCreate()
 
       sparkSession.sql("create table if not exists t_1 (a bigint) using 
parquet")
@@ -300,7 +300,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
       .config("spark.sql.shuffle.partitions", 2)
       .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
-      .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+      .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
       .config("spark.celeborn.client.push.buffer.max.size", 0)
       .config(
         "spark.shuffle.manager",
@@ -344,7 +344,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
       .config("spark.sql.shuffle.partitions", 2)
       .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
-      .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+      .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
       .config("spark.celeborn.client.push.buffer.max.size", 0)
       .config(
         "spark.shuffle.manager",
@@ -390,7 +390,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
       .config("spark.sql.shuffle.partitions", 2)
       .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
-      .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+      .config("spark.celeborn.client.spark.stageRerun.enabled", "true")
       .config("spark.celeborn.client.push.buffer.max.size", 0)
       .config("spark.stage.maxConsecutiveAttempts", "1")
       .config("spark.stage.maxAttempts", "1")
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornShuffleLostSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornShuffleLostSuite.scala
index 8c0e8b101..a281196c7 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornShuffleLostSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornShuffleLostSuite.scala
@@ -49,7 +49,7 @@ class CelebornShuffleLostSuite extends AnyFunSuite
     sparkSession.stop()
 
     val conf = updateSparkConf(sparkConf, ShuffleMode.HASH)
-    conf.set("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+    conf.set("spark.celeborn.client.spark.stageRerun.enabled", "true")
     conf.set("spark.celeborn.test.client.mockShuffleLost", "true")
 
     val celebornSparkSession = SparkSession.builder()

Reply via email to