This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9ef37471 [#477] feat(spark): Fix `rss.resubmit.stage` does not support
dynamic client conf. (#1050)
9ef37471 is described below
commit 9ef374716269399d4d25acee33a2881ddeef3539
Author: Xianming Lei <[email protected]>
AuthorDate: Mon Jul 31 11:37:21 2023 +0800
[#477] feat(spark): Fix `rss.resubmit.stage` does not support dynamic
client conf. (#1050)
### What changes were proposed in this pull request?
Fix `rss.resubmit.stage` does not support dynamic client conf.
### Why are the changes needed?
For #477.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Co-authored-by: leixianming <[email protected]>
---
.../src/main/java/org/apache/spark/shuffle/RssSparkConfig.java | 7 +++++++
.../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java | 3 +--
.../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java | 3 +--
.../org/apache/uniffle/coordinator/SimpleClusterManagerTest.java | 2 ++
.../test/java/org/apache/uniffle/test/RSSStageResubmitTest.java | 6 +++---
5 files changed, 14 insertions(+), 7 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 283122f0..f0f25816 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -366,6 +366,13 @@ public class RssSparkConfig {
.doc(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.description()))
.createWithDefault(-1);
+ public static final ConfigEntry<Boolean> RSS_RESUBMIT_STAGE =
+ createBooleanBuilder(
+ new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_RESUBMIT_STAGE)
+ .internal()
+ .doc("Whether to enable the resubmit stage."))
+ .createWithDefault(false);
+
// spark2 doesn't have this key defined
public static final String SPARK_SHUFFLE_COMPRESS_KEY =
"spark.shuffle.compress";
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index a2c7edbb..d20fca58 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -54,7 +54,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -183,7 +182,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
- if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
+ if (sparkConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE)
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index e1d535d3..d6919697 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -61,7 +61,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -207,7 +206,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
- if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
+ if (sparkConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE)
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 1391308f..4464d749 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -512,6 +512,8 @@ public class SimpleClusterManagerTest {
assertEquals(4, scm.getNodesNum());
assertEquals(2, scm.getExcludeNodes().size());
}
+ File blacklistFile = new File(excludeNodesPath);
+ assertTrue(blacklistFile.delete());
}
private void writeExcludeHosts(String path, Set<String> values) throws
Exception {
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
index 282243da..419efb44 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
@@ -41,10 +41,12 @@ public class RSSStageResubmitTest extends
SparkIntegrationTestBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ final CoordinatorConf coordinatorConf = getCoordinatorConf();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
+ dynamicConf.put(
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
@@ -79,8 +81,6 @@ public class RSSStageResubmitTest extends
SparkIntegrationTestBase {
@Override
public void updateSparkConfCustomer(SparkConf sparkConf) {
- sparkConf.set(
- RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
sparkConf.set("spark.task.maxFailures", String.valueOf(maxTaskFailures));
}