This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new de4b26112 [#1608][part-7] improvement(doc): add doc and optimize 
reassign config options (#1693)
de4b26112 is described below

commit de4b261122152c4d8a00d2f92da90086d50c1a39
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed May 15 17:20:45 2024 +0800

    [#1608][part-7] improvement(doc): add doc and optimize reassign config 
options (#1693)
    
    ### What changes were proposed in this pull request?
    
    1. add docs about reassign mechanism
    2. rename the config from "rss.client.blockSendFailureRetry.enabled" to 
"rss.client.reassign.enabled"
    
    ### Why are the changes needed?
    
    Fix: #1608
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
---
 .../org/apache/spark/shuffle/RssShuffleManager.java    |  3 +--
 .../org/apache/spark/shuffle/RssShuffleManager.java    |  2 +-
 .../apache/spark/shuffle/writer/RssShuffleWriter.java  |  4 ++--
 .../apache/uniffle/common/config/RssClientConf.java    |  4 ++--
 docs/client_guide/spark_client_guide.md                | 18 +++++++++++++++++-
 .../test/PartitionBlockDataReassignBasicTest.java      |  4 ++--
 .../test/PartitionBlockDataReassignMultiTimesTest.java |  4 ++--
 7 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index ba2d275a1..6f5b255be 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -211,8 +211,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     this.rssResubmitStage =
         rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
             && RssSparkShuffleUtils.isStageResubmitSupported();
-    this.taskBlockSendFailureRetry =
-        
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+    this.taskBlockSendFailureRetry = 
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
     this.blockIdSelfManagedEnabled = 
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
     this.shuffleManagerRpcServiceEnabled =
         taskBlockSendFailureRetry || rssResubmitStage || 
blockIdSelfManagedEnabled;
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 983a2a069..aeb29d16d 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -229,7 +229,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
         rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
             && RssSparkShuffleUtils.isStageResubmitSupported();
     this.taskBlockSendFailureRetryEnabled =
-        
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+        rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
 
     // The feature of partition reassign is exclusive with multiple replicas 
and stage retry.
     if (taskBlockSendFailureRetryEnabled) {
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 85d325d63..5a8d83a6c 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -208,8 +208,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     this.blockFailSentRetryEnabled =
         sparkConf.getBoolean(
             RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
-                + 
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
-            
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
+                + RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.key(),
+            RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue());
     this.blockFailSentRetryMaxTimes =
         
RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES);
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 4ecd103c9..b45abc871 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -204,8 +204,8 @@ public class RssClientConf {
               "This option is only valid when the remote storage path is 
specified. If ture, "
                   + "the remote storage conf will use the client side hadoop 
configuration loaded from the classpath.");
 
-  public static final ConfigOption<Boolean> 
RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
-      ConfigOptions.key("rss.client.blockSendFailureRetry.enabled")
+  public static final ConfigOption<Boolean> RSS_CLIENT_REASSIGN_ENABLED =
+      ConfigOptions.key("rss.client.reassign.enabled")
           .booleanType()
           .defaultValue(false)
           .withDescription(
diff --git a/docs/client_guide/spark_client_guide.md 
b/docs/client_guide/spark_client_guide.md
index 80783f78f..dc6f9a9d9 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -149,4 +149,20 @@ Other configuration:
 |---|---|---|
 |spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator|
 |spark.rss.client.access.retry.interval.ms|20000|The interval between retries 
fallback to SortShuffleManager|
-|spark.rss.client.access.retry.times|0|The number of retries fallback to 
SortShuffleManager|
\ No newline at end of file
+|spark.rss.client.access.retry.times|0|The number of retries fallback to 
SortShuffleManager|
+
+### Partition reassign in one shuffle attempt
+
+To achieve better task stability, the partition reassignment mechanism has 
been introduced, which requests new replacement shuffle servers to overcome 
server instability caused by unhealthy conditions or high memory pressure in a 
single shuffle attempt. 
+Currently, this feature is not compatible with stage retry and multiple 
replica mechanisms (additional testing is required).
+
+Using the following configs to enable this feature 
+
+```bash
+# whether to enable reassign mechanism
+spark.rss.client.reassign.enabled                  true
+# The max reassign server num for one partition when using partition reassign 
mechanism.
+spark.rss.client.reassign.maxReassignServerNum     10
+# The block retry max times when partition reassign is enabled. 
+spark.rss.client.reassign.blockRetryMaxTimes       1
+```
\ No newline at end of file
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
index 0a1bd1579..4dd2bab8e 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
@@ -37,7 +37,7 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static 
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
 import static 
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
-import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
 
 /** This class is to basic test the mechanism of partition block data 
reassignment */
 public class PartitionBlockDataReassignBasicTest extends SparkSQLTest {
@@ -105,7 +105,7 @@ public class PartitionBlockDataReassignBasicTest extends 
SparkSQLTest {
     sparkConf.set(
         "spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
         String.valueOf(grpcShuffleServers.size()));
-    sparkConf.set("spark." + 
RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
+    sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
   }
 
   @Override
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
index 5ee3d7b62..a01b695e3 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
@@ -38,7 +38,7 @@ import org.apache.uniffle.storage.util.StorageType;
 import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
 import static 
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
 import static 
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
-import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
 import static 
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY;
 
 /** This class is to test the partition reassign mechanism of multiple 
retries. */
@@ -86,7 +86,7 @@ public class PartitionBlockDataReassignMultiTimesTest extends 
PartitionBlockData
     sparkConf.set("spark.sql.shuffle.partitions", "4");
     sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
     sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
-    sparkConf.set("spark." + 
RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
+    sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
     sparkConf.set("spark." + 
RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "10");
 
     // simulate the grpc servers has different free memory

Reply via email to