This is an automated email from the ASF dual-hosted git repository.

zhongqiangchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 1a8d87f39 [CELEBORN-660][FLINK] Gen unique app id for Celeborn
1a8d87f39 is described below

commit 1a8d87f39c797a7dfc71f70abe6fee3579a2b155
Author: Shuang <[email protected]>
AuthorDate: Tue Jun 13 11:15:16 2023 +0800

    [CELEBORN-660][FLINK] Gen unique app id for Celeborn
    
    ### What changes were proposed in this pull request?
    Use System.currentTimeMillis() + JobID.generate() as CelebornAppId.
    
    ### Why are the changes needed?
    Flink Application mode with HA may use fixed 
id(00000000000000000000000000000000) as jobId. see 
[FLINK-19358](https://issues.apache.org/jira/browse/FLINK-19358).
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual
    
    Closes #1572 from RexXiong/CELEBORN-660.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: zhongqiang.czq <[email protected]>
    (cherry picked from commit e284f72c9526d1757b95032173979a3cd1343e22)
    Signed-off-by: zhongqiang.czq <[email protected]>
---
 .../org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java  |  4 +---
 .../org/apache/celeborn/plugin/flink/utils/FlinkUtils.java     | 10 ++++++++--
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index 4323a5d5c..26b3ed63f 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -44,9 +44,7 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescripto
   private String celebornAppId;
   private volatile LifecycleManager lifecycleManager;
   private ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
-
   private ShuffleResourceTracker shuffleResourceTracker;
-
   private final ScheduledThreadPoolExecutor executor =
       new ScheduledThreadPoolExecutor(
           1,
@@ -68,8 +66,8 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescripto
     if (lifecycleManager == null) {
       synchronized (RemoteShuffleMaster.class) {
         if (lifecycleManager == null) {
-          // use first jobID as celeborn shared appId for all other flink jobs
           celebornAppId = FlinkUtils.toCelebornAppId(rssMetaServiceTimestamp, 
jobID);
+          LOG.info("CelebornAppId: {}", celebornAppId);
           CelebornConf celebornConf =
               
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
           // if not set, set to true as default for flink
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
index 2bcde62ab..7b30d79dd 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.celeborn.common.CelebornConf;
 
 public class FlinkUtils {
-
+  private static final JobID ZERO_JOB_ID = new JobID(0, 0);
   public static final Set<String> pluginConfNames =
       new HashSet<String>() {
         {
@@ -59,7 +59,13 @@ public class FlinkUtils {
   }
 
   public static String toCelebornAppId(long rssMetaServiceTimestamp, JobID 
jobID) {
-    return rssMetaServiceTimestamp + "-" + jobID.toString();
+    // Workaround for FLINK-19358, use first none ZERO_JOB_ID as celeborn 
shared appId for all
+    // other flink jobs
+    if (!ZERO_JOB_ID.equals(jobID)) {
+      return rssMetaServiceTimestamp + "-" + jobID.toString();
+    }
+
+    return rssMetaServiceTimestamp + "-" + JobID.generate();
   }
 
   public static String toShuffleId(JobID jobID, IntermediateDataSetID 
dataSetID) {

Reply via email to