This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new de4b26112 [#1608][part-7] improvement(doc): add doc and optimize
reassign config options (#1693)
de4b26112 is described below
commit de4b261122152c4d8a00d2f92da90086d50c1a39
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed May 15 17:20:45 2024 +0800
[#1608][part-7] improvement(doc): add doc and optimize reassign config
options (#1693)
### What changes were proposed in this pull request?
1. add docs about reassign mechanism
2. rename the config from "rss.client.blockSendFailureRetry.enabled" to
"rss.client.reassign.enabled"
### Why are the changes needed?
Fix: #1608
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---
.../org/apache/spark/shuffle/RssShuffleManager.java | 3 +--
.../org/apache/spark/shuffle/RssShuffleManager.java | 2 +-
.../apache/spark/shuffle/writer/RssShuffleWriter.java | 4 ++--
.../apache/uniffle/common/config/RssClientConf.java | 4 ++--
docs/client_guide/spark_client_guide.md | 18 +++++++++++++++++-
.../test/PartitionBlockDataReassignBasicTest.java | 4 ++--
.../test/PartitionBlockDataReassignMultiTimesTest.java | 4 ++--
7 files changed, 27 insertions(+), 12 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index ba2d275a1..6f5b255be 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -211,8 +211,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
- this.taskBlockSendFailureRetry =
-
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+ this.taskBlockSendFailureRetry =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.blockIdSelfManagedEnabled =
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
taskBlockSendFailureRetry || rssResubmitStage ||
blockIdSelfManagedEnabled;
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 983a2a069..aeb29d16d 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -229,7 +229,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetryEnabled =
-
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+ rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
// The feature of partition reassign is exclusive with multiple replicas
and stage retry.
if (taskBlockSendFailureRetryEnabled) {
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 85d325d63..5a8d83a6c 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -208,8 +208,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.blockFailSentRetryEnabled =
sparkConf.getBoolean(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
- +
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
-
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
+ + RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.key(),
+ RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue());
this.blockFailSentRetryMaxTimes =
RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 4ecd103c9..b45abc871 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -204,8 +204,8 @@ public class RssClientConf {
"This option is only valid when the remote storage path is
specified. If ture, "
+ "the remote storage conf will use the client side hadoop
configuration loaded from the classpath.");
- public static final ConfigOption<Boolean>
RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
- ConfigOptions.key("rss.client.blockSendFailureRetry.enabled")
+ public static final ConfigOption<Boolean> RSS_CLIENT_REASSIGN_ENABLED =
+ ConfigOptions.key("rss.client.reassign.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
diff --git a/docs/client_guide/spark_client_guide.md
b/docs/client_guide/spark_client_guide.md
index 80783f78f..dc6f9a9d9 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -149,4 +149,20 @@ Other configuration:
|---|---|---|
|spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator|
|spark.rss.client.access.retry.interval.ms|20000|The interval between retries
fallback to SortShuffleManager|
-|spark.rss.client.access.retry.times|0|The number of retries fallback to
SortShuffleManager|
\ No newline at end of file
+|spark.rss.client.access.retry.times|0|The number of retries fallback to
SortShuffleManager|
+
+### Partition reassign in one shuffle attempt
+
+To achieve better task stability, the partition reassignment mechanism has
been introduced, which requests new replacement shuffle servers to overcome
server instability caused by unhealthy conditions or high memory pressure in a
single shuffle attempt.
+Currently, this feature is not compatible with stage retry and multiple
replica mechanisms (additional testing is required).
+
+Using the following configs to enable this feature
+
+```bash
+# whether to enable reassign mechanism
+spark.rss.client.reassign.enabled true
+# The max reassign server num for one partition when using partition reassign
mechanism.
+spark.rss.client.reassign.maxReassignServerNum 10
+# The block retry max times when partition reassign is enabled.
+spark.rss.client.reassign.blockRetryMaxTimes 1
+```
\ No newline at end of file
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
index 0a1bd1579..4dd2bab8e 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
@@ -37,7 +37,7 @@ import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
-import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
/** This class is to basic test the mechanism of partition block data
reassignment */
public class PartitionBlockDataReassignBasicTest extends SparkSQLTest {
@@ -105,7 +105,7 @@ public class PartitionBlockDataReassignBasicTest extends
SparkSQLTest {
sparkConf.set(
"spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
String.valueOf(grpcShuffleServers.size()));
- sparkConf.set("spark." +
RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
+ sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
}
@Override
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
index 5ee3d7b62..a01b695e3 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
@@ -38,7 +38,7 @@ import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
import static
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
-import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY;
/** This class is to test the partition reassign mechanism of multiple
retries. */
@@ -86,7 +86,7 @@ public class PartitionBlockDataReassignMultiTimesTest extends
PartitionBlockData
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
- sparkConf.set("spark." +
RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
+ sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
sparkConf.set("spark." +
RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "10");
// simulate the grpc servers has different free memory