This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ef37471 [#477] feat(spark): Fix `rss.resubmit.stage` does not support 
dynamic client conf. (#1050)
9ef37471 is described below

commit 9ef374716269399d4d25acee33a2881ddeef3539
Author: Xianming Lei <[email protected]>
AuthorDate: Mon Jul 31 11:37:21 2023 +0800

    [#477] feat(spark): Fix `rss.resubmit.stage` does not support dynamic 
client conf. (#1050)
    
    ### What changes were proposed in this pull request?
    
    Fix `rss.resubmit.stage` does not support dynamic client conf.
    
    ### Why are the changes needed?
    
    For #477.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT.
    
    Co-authored-by: leixianming <[email protected]>
---
 .../src/main/java/org/apache/spark/shuffle/RssSparkConfig.java     | 7 +++++++
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java  | 3 +--
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java  | 3 +--
 .../org/apache/uniffle/coordinator/SimpleClusterManagerTest.java   | 2 ++
 .../test/java/org/apache/uniffle/test/RSSStageResubmitTest.java    | 6 +++---
 5 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 283122f0..f0f25816 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -366,6 +366,13 @@ public class RssSparkConfig {
                   .doc(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.description()))
           .createWithDefault(-1);
 
+  public static final ConfigEntry<Boolean> RSS_RESUBMIT_STAGE =
+      createBooleanBuilder(
+              new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_RESUBMIT_STAGE)
+                  .internal()
+                  .doc("Whether to enable the resubmit stage."))
+          .createWithDefault(false);
+
   // spark2 doesn't have this key defined
   public static final String SPARK_SHUFFLE_COMPRESS_KEY = 
"spark.shuffle.compress";
 
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index a2c7edbb..d20fca58 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -54,7 +54,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -183,7 +182,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       if (isDriver) {
         heartBeatScheduledExecutorService =
             
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
-        if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
+        if (sparkConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE)
             && RssSparkShuffleUtils.isStageResubmitSupported()) {
           LOG.info("stage resubmit is supported and enabled");
           // start shuffle manager server
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index e1d535d3..d6919697 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -61,7 +61,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -207,7 +206,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     if (isDriver) {
       heartBeatScheduledExecutorService =
           ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
-      if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
+      if (sparkConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE)
           && RssSparkShuffleUtils.isStageResubmitSupported()) {
         LOG.info("stage resubmit is supported and enabled");
         // start shuffle manager server
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 1391308f..4464d749 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -512,6 +512,8 @@ public class SimpleClusterManagerTest {
       assertEquals(4, scm.getNodesNum());
       assertEquals(2, scm.getExcludeNodes().size());
     }
+    File blacklistFile = new File(excludeNodesPath);
+    assertTrue(blacklistFile.delete());
   }
 
   private void writeExcludeHosts(String path, Set<String> values) throws 
Exception {
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
index 282243da..419efb44 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
@@ -41,10 +41,12 @@ public class RSSStageResubmitTest extends 
SparkIntegrationTestBase {
 
   @BeforeAll
   public static void setupServers() throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    final CoordinatorConf coordinatorConf = getCoordinatorConf();
     Map<String, String> dynamicConf = Maps.newHashMap();
     dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
     dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE.name());
+    dynamicConf.put(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
     addDynamicConf(coordinatorConf, dynamicConf);
     createCoordinatorServer(coordinatorConf);
     ShuffleServerConf shuffleServerConf = getShuffleServerConf();
@@ -79,8 +81,6 @@ public class RSSStageResubmitTest extends 
SparkIntegrationTestBase {
 
   @Override
   public void updateSparkConfCustomer(SparkConf sparkConf) {
-    sparkConf.set(
-        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
     sparkConf.set("spark.task.maxFailures", String.valueOf(maxTaskFailures));
   }
 

Reply via email to