This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new eb49ed74e [CELEBORN-1648] Refine AppUniqueId with UUID suffix
eb49ed74e is described below

commit eb49ed74eef386c7457e354d096724a45f88bbde
Author: Chongchen Chen <[email protected]>
AuthorDate: Thu Oct 17 17:38:03 2024 +0800

    [CELEBORN-1648] Refine AppUniqueId with UUID suffix
    
    ### What changes were proposed in this pull request?
    
    We can add randomUUID as an suffix to solve it
    
    ### Why are the changes needed?
    
    currently, we cannot guarantee application id is really unique. this may 
lead to data issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    test locally
    
    Closes #2810 from chenkovsky/feature/uuid_appid.
    
    Authored-by: Chongchen Chen <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../mapreduce/v2/app/MRAppMasterWithCeleborn.java     |  8 ++++----
 .../spark/shuffle/celeborn/SparkShuffleManager.java   |  7 ++++---
 .../spark/shuffle/celeborn/SparkShuffleManager.java   |  7 ++++---
 .../org/apache/celeborn/common/CelebornConf.scala     | 19 ++++++++++++++++++-
 docs/configuration/client.md                          |  1 +
 5 files changed, 31 insertions(+), 11 deletions(-)

diff --git 
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
 
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
index 98653cb93..a23eab494 100644
--- 
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
+++ 
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
@@ -66,16 +66,16 @@ public class MRAppMasterWithCeleborn extends MRAppMaster {
     int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
     if (numReducers > 0) {
       CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
-      LifecycleManager lifecycleManager =
-          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String appUniqueId = 
conf.appUniqueIdWithUUIDSuffix(applicationAttemptId.toString());
+      LifecycleManager lifecycleManager = new LifecycleManager(appUniqueId, 
conf);
       String lmHost = lifecycleManager.getHost();
       int lmPort = lifecycleManager.getPort();
-      logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, 
applicationAttemptId);
+      logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, 
appUniqueId);
       JobConf lmConf = new JobConf();
       lmConf.clear();
       lmConf.set(HadoopUtils.MR_CELEBORN_LM_HOST, lmHost);
       lmConf.set(HadoopUtils.MR_CELEBORN_LM_PORT, lmPort + "");
-      lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, 
applicationAttemptId.toString());
+      lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, appUniqueId);
       writeLifecycleManagerConfToTask(jobConf, lmConf);
     }
   }
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 861070830..96d28acf4 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -95,7 +95,8 @@ public class SparkShuffleManager implements ShuffleManager {
     if (isDriver && lifecycleManager == null) {
       synchronized (this) {
         if (lifecycleManager == null) {
-          lifecycleManager = new LifecycleManager(appId, celebornConf);
+          appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
+          lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
           if (celebornConf.clientFetchThrowsFetchFailure()) {
             MapOutputTrackerMaster mapOutputTracker =
                 (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
@@ -113,8 +114,8 @@ public class SparkShuffleManager implements ShuffleManager {
     // Note: generate app unique id at driver side, make sure 
dependency.rdd.context
     // is the same SparkContext among different shuffleIds.
     // This method may be called many times.
-    appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
-    initializeLifecycleManager(appUniqueId);
+    String appId = SparkUtils.appUniqueId(dependency.rdd().context());
+    initializeLifecycleManager(appId);
 
     lifecycleManager.registerAppShuffleDeterminate(
         shuffleId,
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index a2a6f7a37..516246c13 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -129,7 +129,7 @@ public class SparkShuffleManager implements ShuffleManager {
     return _sortShuffleManager;
   }
 
-  private void initializeLifecycleManager() {
+  private void initializeLifecycleManager(String appId) {
     // Only create LifecycleManager singleton in Driver. When register shuffle 
multiple times, we
     // need to ensure that LifecycleManager will only be created once. 
Parallelism needs to be
     // considered in this place, because if there is one RDD that depends on 
multiple RDDs
@@ -137,6 +137,7 @@ public class SparkShuffleManager implements ShuffleManager {
     if (isDriver && lifecycleManager == null) {
       synchronized (this) {
         if (lifecycleManager == null) {
+          appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
           lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
           if (celebornConf.clientFetchThrowsFetchFailure()) {
             MapOutputTrackerMaster mapOutputTracker =
@@ -156,8 +157,8 @@ public class SparkShuffleManager implements ShuffleManager {
     // Note: generate app unique id at driver side, make sure 
dependency.rdd.context
     // is the same SparkContext among different shuffleIds.
     // This method may be called many times.
-    appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
-    initializeLifecycleManager();
+    String appId = SparkUtils.appUniqueId(dependency.rdd().context());
+    initializeLifecycleManager(appId);
 
     lifecycleManager.registerAppShuffleDeterminate(
         shuffleId,
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 955b46076..ee33e26e6 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.common
 
 import java.io.{File, IOException}
-import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, 
Locale, Map => JMap}
+import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, 
Locale, Map => JMap, UUID}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
@@ -907,6 +907,15 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientExcludeReplicaOnFailureEnabled: Boolean =
     get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
   def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
+  def clientApplicationUUIDSuffixEnabled: Boolean = 
get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)
+
+  def appUniqueIdWithUUIDSuffix(appId: String): String = {
+    if (clientApplicationUUIDSuffixEnabled) {
+      appId + "-" + UUID.randomUUID().toString.replaceAll("-", "")
+    } else {
+      appId
+    }
+  }
 
   // //////////////////////////////////////////////////////
   //               Shuffle Compression                   //
@@ -5065,6 +5074,14 @@ object CelebornConf extends Logging {
       .checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1 
(inclusive)")
       .createWithDefault(0.4)
 
+  val CLIENT_APPLICATION_UUID_SUFFIX_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.application.uuidSuffix.enabled")
+      .categories("client")
+      .version("0.6.0")
+      .doc("Whether to add UUID suffix for application id for unique. When 
`true`, add UUID suffix for unique application id. Currently, this only applies 
to Spark and MR.")
+      .booleanConf
+      .createWithDefault(false)
+
   val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
     buildConf("celeborn.test.alternative.key")
       .withAlternative("celeborn.test.alternative.deprecatedKey")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 80cd1bef3..91d1faac5 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -21,6 +21,7 @@ license: |
 | --- | ------- | --------- | ----------- | ----- | ---------- |
 | celeborn.client.application.heartbeatInterval | 10s | false | Interval for 
client to send heartbeat message to master. | 0.3.0 | 
celeborn.application.heartbeatInterval | 
 | celeborn.client.application.unregister.enabled | true | false | When true, 
Celeborn client will inform celeborn master the application is already shutdown 
during client exit, this allows the cluster to release resources immediately, 
resulting in resource savings. | 0.3.2 |  | 
+| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to 
add UUID suffix for application id for unique. When `true`, add UUID suffix for 
unique application id. Currently, this only applies to Spark and MR. | 0.6.0 |  
| 
 | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable 
chunk prefetch when creating CelebornInputStream. | 0.6.0 |  | 
 | celeborn.client.closeIdleConnections | true | false | Whether client will 
close idle connections. | 0.3.0 |  | 
 | celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When 
true, LifecycleManager will skip workers which are in the excluded list. | 
0.3.0 |  | 

Reply via email to