This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c8794168f [CELEBORN-1715] RemoteShuffleMaster should check
celeborn.client.push.replicate.enabled in constructor
c8794168f is described below
commit c8794168f4237d860e28006997d30bc2b2242ae0
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 20 11:29:00 2024 +0800
[CELEBORN-1715] RemoteShuffleMaster should check
celeborn.client.push.replicate.enabled in constructor
### What changes were proposed in this pull request?
`RemoteShuffleMaster` checks `celeborn.client.push.replicate.enabled` in
constructor instead of `registerJob`.
### Why are the changes needed?
`RemoteShuffleMaster` checks whether
`celeborn.client.push.replicate.enabled` is false in `registerJob`, which could
check in constructor.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`RemoteShuffleMasterSuiteJ#testInvalidShuffleConfig `
Closes #2911 from SteNicholas/CELEBORN-1715.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/plugin/flink/RemoteShuffleMaster.java | 19 ++++++-------------
.../celeborn/plugin/flink/utils/FlinkUtils.java | 14 +++++++++++++-
.../plugin/flink/RemoteShuffleMasterSuiteJ.java | 8 ++++++++
.../plugin/flink/RemoteShuffleMasterSuiteJ.java | 8 ++++++++
.../plugin/flink/RemoteShuffleMasterSuiteJ.java | 8 ++++++++
.../plugin/flink/RemoteShuffleMasterSuiteJ.java | 8 ++++++++
.../plugin/flink/RemoteShuffleMasterSuiteJ.java | 8 ++++++++
.../plugin/flink/RemoteShuffleMasterSuiteJ.java | 8 ++++++++
.../plugin/flink/tiered/CelebornTierMasterAgent.java | 5 -----
.../plugin/flink/RemoteShuffleMasterSuiteJ.java | 8 ++++++++
10 files changed, 75 insertions(+), 19 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index 219988542..45c5f91dc 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -49,6 +49,7 @@ import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescriptor> {
private static final Logger LOG =
LoggerFactory.getLogger(RemoteShuffleMaster.class);
+ private final CelebornConf conf;
private final ShuffleMasterContext shuffleMasterContext;
// Flink JobId -> Celeborn register shuffleIds
private final Map<JobID, Set<Integer>> jobShuffleIds =
JavaUtils.newConcurrentHashMap();
@@ -64,7 +65,9 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
public RemoteShuffleMaster(
ShuffleMasterContext shuffleMasterContext, ResultPartitionAdapter
resultPartitionDelegation) {
- checkShuffleConfig(shuffleMasterContext.getConfiguration());
+ Configuration configuration = shuffleMasterContext.getConfiguration();
+ checkShuffleConfig(configuration);
+ this.conf = FlinkUtils.toCelebornConf(configuration);
this.shuffleMasterContext = shuffleMasterContext;
this.resultPartitionDelegation = resultPartitionDelegation;
this.lifecycleManagerTimestamp = System.currentTimeMillis();
@@ -78,16 +81,7 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
if (lifecycleManager == null) {
celebornAppId =
FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
- CelebornConf celebornConf =
-
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
- // if not set, set to true as default for flink
-
celebornConf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(),
true);
- lifecycleManager = new LifecycleManager(celebornAppId, celebornConf);
- if (celebornConf.clientPushReplicateEnabled()) {
- shuffleMasterContext.onFatalError(
- new RuntimeException("Currently replicate shuffle data is
unsupported for flink."));
- return;
- }
+ lifecycleManager = new LifecycleManager(celebornAppId, conf);
this.shuffleResourceTracker = new ShuffleResourceTracker(executor,
lifecycleManager);
}
}
@@ -95,10 +89,10 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new
HashSet<>());
LOG.info("Register job: {}.", jobID);
- shuffleResourceTracker.registerJob(context);
if (previousShuffleIds != null) {
throw new RuntimeException("Duplicated registration job: " + jobID);
}
+ shuffleResourceTracker.registerJob(context);
}
@Override
@@ -212,7 +206,6 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
}
int numResultPartitions =
taskInputsOutputsDescriptor.getSubpartitionNums().size();
- CelebornConf conf =
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
long numBytesPerPartition = conf.clientFlinkMemoryPerResultPartition();
long numBytesForOutput = numBytesPerPartition * numResultPartitions;
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
index 90f75946f..7c7d1cf81 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
@@ -43,6 +43,15 @@ public class FlinkUtils {
"remote-shuffle.job.compression.codec");
public static CelebornConf toCelebornConf(Configuration configuration) {
+ if (Boolean.parseBoolean(
+ configuration.getString(
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
+
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().defaultValueString()))) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()));
+ }
CelebornConf tmpCelebornConf = new CelebornConf();
Map<String, String> confMap = configuration.toMap();
for (Map.Entry<String, String> entry : confMap.entrySet()) {
@@ -52,7 +61,10 @@ public class FlinkUtils {
}
}
- return tmpCelebornConf;
+ // The default value of this config option is false. If set to true,
Celeborn will use local
+ // allocated workers as candidate being checked workers, this is more
useful for map partition
+ // to regenerate the lost data. So if not set, set to true as default for
flink.
+ return
tmpCelebornConf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(),
true);
}
public static String toCelebornAppId(long lifecycleManagerTimestamp, JobID
jobID) {
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index 73c08d2d3..da63a185e 100644
---
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -218,6 +218,14 @@ public class RemoteShuffleMasterSuiteJ {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ Configuration configuration = new Configuration();
+
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
"true");
+ Assert.assertThrows(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+ IllegalArgumentException.class,
+ () -> createShuffleMaster(configuration));
}
@After
diff --git
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index 73c08d2d3..da63a185e 100644
---
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -218,6 +218,14 @@ public class RemoteShuffleMasterSuiteJ {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ Configuration configuration = new Configuration();
+
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
"true");
+ Assert.assertThrows(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+ IllegalArgumentException.class,
+ () -> createShuffleMaster(configuration));
}
@After
diff --git
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index 95eb47538..4305fceff 100644
---
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -228,6 +228,14 @@ public class RemoteShuffleMasterSuiteJ {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ Configuration configuration = new Configuration();
+
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
"true");
+ Assert.assertThrows(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+ IllegalArgumentException.class,
+ () -> createShuffleMaster(configuration));
}
@After
diff --git
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index ed6fc18e4..980c9b269 100644
---
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -240,6 +240,14 @@ public class RemoteShuffleMasterSuiteJ {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ Configuration configuration = new Configuration();
+
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
"true");
+ Assert.assertThrows(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+ IllegalArgumentException.class,
+ () -> createShuffleMaster(configuration));
}
@After
diff --git
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index ed6fc18e4..980c9b269 100644
---
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -240,6 +240,14 @@ public class RemoteShuffleMasterSuiteJ {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ Configuration configuration = new Configuration();
+
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
"true");
+ Assert.assertThrows(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+ IllegalArgumentException.class,
+ () -> createShuffleMaster(configuration));
}
@After
diff --git
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index bd50c87b1..fbcb9efc3 100644
---
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -240,6 +240,14 @@ public class RemoteShuffleMasterSuiteJ {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ Configuration configuration = new Configuration();
+
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
"true");
+ Assert.assertThrows(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+ IllegalArgumentException.class,
+ () -> createShuffleMaster(configuration));
}
@After
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
index 05a5b45e7..e0de46ae3 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
@@ -87,11 +87,6 @@ public class CelebornTierMasterAgent implements
TierMasterAgent {
if (lifecycleManager == null) {
celebornAppId =
FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
- // The default value of this config option is false. If set to true,
Celeborn will use
- // local allocated workers as candidate being checked workers, this
is more useful for
- // map partition to regenerate the lost data. So if not set, set to
true as default for
- // flink.
-
conf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
lifecycleManager = new LifecycleManager(celebornAppId, conf);
this.shuffleResourceTracker = new ShuffleResourceTracker(executor,
lifecycleManager);
}
diff --git
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
index db5da0f38..821f8d827 100644
---
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
+++
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java
@@ -243,6 +243,14 @@ public class RemoteShuffleMasterSuiteJ {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ Configuration configuration = new Configuration();
+
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
"true");
+ Assert.assertThrows(
+ String.format(
+ "Flink does not support replicate shuffle data. Please check the
config %s.",
+ CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
+ IllegalArgumentException.class,
+ () -> createShuffleMaster(configuration));
}
@After