This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new eb49ed74e [CELEBORN-1648] Refine AppUniqueId with UUID suffix
eb49ed74e is described below
commit eb49ed74eef386c7457e354d096724a45f88bbde
Author: Chongchen Chen <[email protected]>
AuthorDate: Thu Oct 17 17:38:03 2024 +0800
[CELEBORN-1648] Refine AppUniqueId with UUID suffix
### What changes were proposed in this pull request?
We can add randomUUID as an suffix to solve it
### Why are the changes needed?
currently, we cannot guarantee application id is really unique. this may
lead to data issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
test locally
Closes #2810 from chenkovsky/feature/uuid_appid.
Authored-by: Chongchen Chen <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../mapreduce/v2/app/MRAppMasterWithCeleborn.java | 8 ++++----
.../spark/shuffle/celeborn/SparkShuffleManager.java | 7 ++++---
.../spark/shuffle/celeborn/SparkShuffleManager.java | 7 ++++---
.../org/apache/celeborn/common/CelebornConf.scala | 19 ++++++++++++++++++-
docs/configuration/client.md | 1 +
5 files changed, 31 insertions(+), 11 deletions(-)
diff --git
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
index 98653cb93..a23eab494 100644
---
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
+++
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
@@ -66,16 +66,16 @@ public class MRAppMasterWithCeleborn extends MRAppMaster {
int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (numReducers > 0) {
CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
- LifecycleManager lifecycleManager =
- new LifecycleManager(applicationAttemptId.toString(), conf);
+ String appUniqueId =
conf.appUniqueIdWithUUIDSuffix(applicationAttemptId.toString());
+ LifecycleManager lifecycleManager = new LifecycleManager(appUniqueId,
conf);
String lmHost = lifecycleManager.getHost();
int lmPort = lifecycleManager.getPort();
- logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort,
applicationAttemptId);
+ logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort,
appUniqueId);
JobConf lmConf = new JobConf();
lmConf.clear();
lmConf.set(HadoopUtils.MR_CELEBORN_LM_HOST, lmHost);
lmConf.set(HadoopUtils.MR_CELEBORN_LM_PORT, lmPort + "");
- lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID,
applicationAttemptId.toString());
+ lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, appUniqueId);
writeLifecycleManagerConfToTask(jobConf, lmConf);
}
}
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 861070830..96d28acf4 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -95,7 +95,8 @@ public class SparkShuffleManager implements ShuffleManager {
if (isDriver && lifecycleManager == null) {
synchronized (this) {
if (lifecycleManager == null) {
- lifecycleManager = new LifecycleManager(appId, celebornConf);
+ appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
+ lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
@@ -113,8 +114,8 @@ public class SparkShuffleManager implements ShuffleManager {
// Note: generate app unique id at driver side, make sure
dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
- appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
- initializeLifecycleManager(appUniqueId);
+ String appId = SparkUtils.appUniqueId(dependency.rdd().context());
+ initializeLifecycleManager(appId);
lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index a2a6f7a37..516246c13 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -129,7 +129,7 @@ public class SparkShuffleManager implements ShuffleManager {
return _sortShuffleManager;
}
- private void initializeLifecycleManager() {
+ private void initializeLifecycleManager(String appId) {
// Only create LifecycleManager singleton in Driver. When register shuffle
multiple times, we
// need to ensure that LifecycleManager will only be created once.
Parallelism needs to be
// considered in this place, because if there is one RDD that depends on
multiple RDDs
@@ -137,6 +137,7 @@ public class SparkShuffleManager implements ShuffleManager {
if (isDriver && lifecycleManager == null) {
synchronized (this) {
if (lifecycleManager == null) {
+ appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
@@ -156,8 +157,8 @@ public class SparkShuffleManager implements ShuffleManager {
// Note: generate app unique id at driver side, make sure
dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
- appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
- initializeLifecycleManager();
+ String appId = SparkUtils.appUniqueId(dependency.rdd().context());
+ initializeLifecycleManager(appId);
lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 955b46076..ee33e26e6 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.common
import java.io.{File, IOException}
-import java.util.{Collection => JCollection, Collections, HashMap => JHashMap,
Locale, Map => JMap}
+import java.util.{Collection => JCollection, Collections, HashMap => JHashMap,
Locale, Map => JMap, UUID}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@@ -907,6 +907,15 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
+ def clientApplicationUUIDSuffixEnabled: Boolean =
get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)
+
+ def appUniqueIdWithUUIDSuffix(appId: String): String = {
+ if (clientApplicationUUIDSuffixEnabled) {
+ appId + "-" + UUID.randomUUID().toString.replaceAll("-", "")
+ } else {
+ appId
+ }
+ }
// //////////////////////////////////////////////////////
// Shuffle Compression //
@@ -5065,6 +5074,14 @@ object CelebornConf extends Logging {
.checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1
(inclusive)")
.createWithDefault(0.4)
+ val CLIENT_APPLICATION_UUID_SUFFIX_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.application.uuidSuffix.enabled")
+ .categories("client")
+ .version("0.6.0")
+ .doc("Whether to add UUID suffix for application id for unique. When
`true`, add UUID suffix for unique application id. Currently, this only applies
to Spark and MR.")
+ .booleanConf
+ .createWithDefault(false)
+
val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
buildConf("celeborn.test.alternative.key")
.withAlternative("celeborn.test.alternative.deprecatedKey")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 80cd1bef3..91d1faac5 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -21,6 +21,7 @@ license: |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.client.application.heartbeatInterval | 10s | false | Interval for
client to send heartbeat message to master. | 0.3.0 |
celeborn.application.heartbeatInterval |
| celeborn.client.application.unregister.enabled | true | false | When true,
Celeborn client will inform celeborn master the application is already shutdown
during client exit, this allows the cluster to release resources immediately,
resulting in resource savings. | 0.3.2 | |
+| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to
add UUID suffix for application id for unique. When `true`, add UUID suffix for
unique application id. Currently, this only applies to Spark and MR. | 0.6.0 |
|
| celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable
chunk prefetch when creating CelebornInputStream. | 0.6.0 | |
| celeborn.client.closeIdleConnections | true | false | Whether client will
close idle connections. | 0.3.0 | |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When
true, LifecycleManager will skip workers which are in the excluded list. |
0.3.0 | |