This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c8794168f [CELEBORN-1715] RemoteShuffleMaster should check 
celeborn.client.push.replicate.enabled in constructor
c8794168f is described below

commit c8794168f4237d860e28006997d30bc2b2242ae0
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 20 11:29:00 2024 +0800

    [CELEBORN-1715] RemoteShuffleMaster should check 
celeborn.client.push.replicate.enabled in constructor
    
    ### What changes were proposed in this pull request?
    
    `RemoteShuffleMaster` checks `celeborn.client.push.replicate.enabled` in 
constructor instead of `registerJob`.
    
    ### Why are the changes needed?
    
    `RemoteShuffleMaster` checks whether 
`celeborn.client.push.replicate.enabled` is false in `registerJob`, which could 
check in constructor.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `RemoteShuffleMasterSuiteJ#testInvalidShuffleConfig `
    
    Closes #2911 from SteNicholas/CELEBORN-1715.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../celeborn/plugin/flink/RemoteShuffleMaster.java    | 19 ++++++-------------
 .../celeborn/plugin/flink/utils/FlinkUtils.java       | 14 +++++++++++++-
 .../plugin/flink/RemoteShuffleMasterSuiteJ.java       |  8 ++++++++
 .../plugin/flink/RemoteShuffleMasterSuiteJ.java       |  8 ++++++++
 .../plugin/flink/RemoteShuffleMasterSuiteJ.java       |  8 ++++++++
 .../plugin/flink/RemoteShuffleMasterSuiteJ.java       |  8 ++++++++
 .../plugin/flink/RemoteShuffleMasterSuiteJ.java       |  8 ++++++++
 .../plugin/flink/RemoteShuffleMasterSuiteJ.java       |  8 ++++++++
 .../plugin/flink/tiered/CelebornTierMasterAgent.java  |  5 -----
 .../plugin/flink/RemoteShuffleMasterSuiteJ.java       |  8 ++++++++
 10 files changed, 75 insertions(+), 19 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index 219988542..45c5f91dc 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -49,6 +49,7 @@ import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
 
 public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescriptor> {
   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteShuffleMaster.class);
+  private final CelebornConf conf;
   private final ShuffleMasterContext shuffleMasterContext;
   // Flink JobId -> Celeborn register shuffleIds
   private final Map<JobID, Set<Integer>> jobShuffleIds = 
JavaUtils.newConcurrentHashMap();
@@ -64,7 +65,9 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescripto
 
   public RemoteShuffleMaster(
       ShuffleMasterContext shuffleMasterContext, ResultPartitionAdapter 
resultPartitionDelegation) {
-    checkShuffleConfig(shuffleMasterContext.getConfiguration());
+    Configuration configuration = shuffleMasterContext.getConfiguration();
+    checkShuffleConfig(configuration);
+    this.conf = FlinkUtils.toCelebornConf(configuration);
     this.shuffleMasterContext = shuffleMasterContext;
     this.resultPartitionDelegation = resultPartitionDelegation;
     this.lifecycleManagerTimestamp = System.currentTimeMillis();
@@ -78,16 +81,7 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescripto
         if (lifecycleManager == null) {
           celebornAppId = 
FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
           LOG.info("CelebornAppId: {}", celebornAppId);
-          CelebornConf celebornConf =
-              
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
-          // if not set, set to true as default for flink
-          
celebornConf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), 
true);
-          lifecycleManager = new LifecycleManager(celebornAppId, celebornConf);
-          if (celebornConf.clientPushReplicateEnabled()) {
-            shuffleMasterContext.onFatalError(
-                new RuntimeException("Currently replicate shuffle data is 
unsupported for flink."));
-            return;
-          }
+          lifecycleManager = new LifecycleManager(celebornAppId, conf);
           this.shuffleResourceTracker = new ShuffleResourceTracker(executor, 
lifecycleManager);
         }
       }
@@ -95,10 +89,10 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescripto
 
     Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new 
HashSet<>());
     LOG.info("Register job: {}.", jobID);
-    shuffleResourceTracker.registerJob(context);
     if (previousShuffleIds != null) {
       throw new RuntimeException("Duplicated registration job: " + jobID);
     }
+    shuffleResourceTracker.registerJob(context);
   }
 
   @Override
@@ -212,7 +206,6 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescripto
     }
 
     int numResultPartitions = 
taskInputsOutputsDescriptor.getSubpartitionNums().size();
-    CelebornConf conf = 
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
     long numBytesPerPartition = conf.clientFlinkMemoryPerResultPartition();
     long numBytesForOutput = numBytesPerPartition * numResultPartitions;
 
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
index 90f75946f..7c7d1cf81 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
@@ -43,6 +43,15 @@ public class FlinkUtils {
           "remote-shuffle.job.compression.codec");
 
   public static CelebornConf toCelebornConf(Configuration configuration) {
+    if (Boolean.parseBoolean(
+        configuration.getString(
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
+            
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().defaultValueString()))) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Flink does not support replicate shuffle data. Please check the 
config %s.",
+              CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()));
+    }
     CelebornConf tmpCelebornConf = new CelebornConf();
     Map<String, String> confMap = configuration.toMap();
     for (Map.Entry<String, String> entry : confMap.entrySet()) {
@@ -52,7 +61,10 @@ public class FlinkUtils {
       }
     }
 
-    return tmpCelebornConf;
+    // The default value of this config option is false. If set to true, 
Celeborn will use local
+    // allocated workers as candidate being checked workers, this is more 
useful for map partition
+    // to regenerate the lost data. So if not set, set to true as default for 
flink.
+    return 
tmpCelebornConf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(),
 true);
   }
 
   public static String toCelebornAppId(long lifecycleManagerTimestamp, JobID 
jobID) {
diff --git 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index 73c08d2d3..da63a185e 100644
--- 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++ 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -218,6 +218,14 @@ public class RemoteShuffleMasterSuiteJ {
                     .set(
                         ExecutionOptions.BATCH_SHUFFLE_MODE,
                         BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+    Configuration configuration = new Configuration();
+    
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), 
"true");
+    Assert.assertThrows(
+        String.format(
+            "Flink does not support replicate shuffle data. Please check the 
config %s.",
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+        IllegalArgumentException.class,
+        () -> createShuffleMaster(configuration));
   }
 
   @After
diff --git 
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
 
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index 73c08d2d3..da63a185e 100644
--- 
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++ 
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -218,6 +218,14 @@ public class RemoteShuffleMasterSuiteJ {
                     .set(
                         ExecutionOptions.BATCH_SHUFFLE_MODE,
                         BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+    Configuration configuration = new Configuration();
+    
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), 
"true");
+    Assert.assertThrows(
+        String.format(
+            "Flink does not support replicate shuffle data. Please check the 
config %s.",
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+        IllegalArgumentException.class,
+        () -> createShuffleMaster(configuration));
   }
 
   @After
diff --git 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index 95eb47538..4305fceff 100644
--- 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++ 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -228,6 +228,14 @@ public class RemoteShuffleMasterSuiteJ {
                     .set(
                         ExecutionOptions.BATCH_SHUFFLE_MODE,
                         BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+    Configuration configuration = new Configuration();
+    
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), 
"true");
+    Assert.assertThrows(
+        String.format(
+            "Flink does not support replicate shuffle data. Please check the 
config %s.",
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+        IllegalArgumentException.class,
+        () -> createShuffleMaster(configuration));
   }
 
   @After
diff --git 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index ed6fc18e4..980c9b269 100644
--- 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++ 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -240,6 +240,14 @@ public class RemoteShuffleMasterSuiteJ {
                     .set(
                         ExecutionOptions.BATCH_SHUFFLE_MODE,
                         BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+    Configuration configuration = new Configuration();
+    
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), 
"true");
+    Assert.assertThrows(
+        String.format(
+            "Flink does not support replicate shuffle data. Please check the 
config %s.",
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+        IllegalArgumentException.class,
+        () -> createShuffleMaster(configuration));
   }
 
   @After
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index ed6fc18e4..980c9b269 100644
--- 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -240,6 +240,14 @@ public class RemoteShuffleMasterSuiteJ {
                     .set(
                         ExecutionOptions.BATCH_SHUFFLE_MODE,
                         BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+    Configuration configuration = new Configuration();
+    
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), 
"true");
+    Assert.assertThrows(
+        String.format(
+            "Flink does not support replicate shuffle data. Please check the 
config %s.",
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+        IllegalArgumentException.class,
+        () -> createShuffleMaster(configuration));
   }
 
   @After
diff --git 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index bd50c87b1..fbcb9efc3 100644
--- 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++ 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -240,6 +240,14 @@ public class RemoteShuffleMasterSuiteJ {
                     .set(
                         ExecutionOptions.BATCH_SHUFFLE_MODE,
                         BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+    Configuration configuration = new Configuration();
+    
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), 
"true");
+    Assert.assertThrows(
+        String.format(
+            "Flink does not support replicate shuffle data. Please check the 
config %s.",
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+        IllegalArgumentException.class,
+        () -> createShuffleMaster(configuration));
   }
 
   @After
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
index 05a5b45e7..e0de46ae3 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
@@ -87,11 +87,6 @@ public class CelebornTierMasterAgent implements 
TierMasterAgent {
         if (lifecycleManager == null) {
           celebornAppId = 
FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
           LOG.info("CelebornAppId: {}", celebornAppId);
-          // The default value of this config option is false. If set to true, 
Celeborn will use
-          // local allocated workers as candidate being checked workers, this 
is more useful for
-          // map partition to regenerate the lost data. So if not set, set to 
true as default for
-          // flink.
-          
conf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
           lifecycleManager = new LifecycleManager(celebornAppId, conf);
           this.shuffleResourceTracker = new ShuffleResourceTracker(executor, 
lifecycleManager);
         }
diff --git 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index db5da0f38..821f8d827 100644
--- 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++ 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -243,6 +243,14 @@ public class RemoteShuffleMasterSuiteJ {
                     .set(
                         ExecutionOptions.BATCH_SHUFFLE_MODE,
                         BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+    Configuration configuration = new Configuration();
+    
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), 
"true");
+    Assert.assertThrows(
+        String.format(
+            "Flink does not support replicate shuffle data. Please check the 
config %s.",
+            CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+        IllegalArgumentException.class,
+        () -> createShuffleMaster(configuration));
   }
 
   @After

Reply via email to