This is an automated email from the ASF dual-hosted git repository.
zhongqiangchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 1a8d87f39 [CELEBORN-660][FLINK] Gen unique app id for Celeborn
1a8d87f39 is described below
commit 1a8d87f39c797a7dfc71f70abe6fee3579a2b155
Author: Shuang <[email protected]>
AuthorDate: Tue Jun 13 11:15:16 2023 +0800
[CELEBORN-660][FLINK] Gen unique app id for Celeborn
### What changes were proposed in this pull request?
Use System.currentTimeMillis() + JobID.generate() as CelebornAppId.
### Why are the changes needed?
Flink Application mode with HA may use fixed
id(00000000000000000000000000000000) as jobId. see
[FLINK-19358](https://issues.apache.org/jira/browse/FLINK-19358).
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual
Closes #1572 from RexXiong/CELEBORN-660.
Authored-by: Shuang <[email protected]>
Signed-off-by: zhongqiang.czq <[email protected]>
(cherry picked from commit e284f72c9526d1757b95032173979a3cd1343e22)
Signed-off-by: zhongqiang.czq <[email protected]>
---
.../org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java | 4 +---
.../org/apache/celeborn/plugin/flink/utils/FlinkUtils.java | 10 ++++++++--
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index 4323a5d5c..26b3ed63f 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -44,9 +44,7 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
private String celebornAppId;
private volatile LifecycleManager lifecycleManager;
private ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
-
private ShuffleResourceTracker shuffleResourceTracker;
-
private final ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
1,
@@ -68,8 +66,8 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
if (lifecycleManager == null) {
synchronized (RemoteShuffleMaster.class) {
if (lifecycleManager == null) {
- // use first jobID as celeborn shared appId for all other flink jobs
celebornAppId = FlinkUtils.toCelebornAppId(rssMetaServiceTimestamp,
jobID);
+ LOG.info("CelebornAppId: {}", celebornAppId);
CelebornConf celebornConf =
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
// if not set, set to true as default for flink
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
index 2bcde62ab..7b30d79dd 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
@@ -29,7 +29,7 @@ import
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.celeborn.common.CelebornConf;
public class FlinkUtils {
-
+ private static final JobID ZERO_JOB_ID = new JobID(0, 0);
public static final Set<String> pluginConfNames =
new HashSet<String>() {
{
@@ -59,7 +59,13 @@ public class FlinkUtils {
}
public static String toCelebornAppId(long rssMetaServiceTimestamp, JobID
jobID) {
- return rssMetaServiceTimestamp + "-" + jobID.toString();
+ // Workaround for FLINK-19358, use first none ZERO_JOB_ID as celeborn
shared appId for all
+ // other flink jobs
+ if (!ZERO_JOB_ID.equals(jobID)) {
+ return rssMetaServiceTimestamp + "-" + jobID.toString();
+ }
+
+ return rssMetaServiceTimestamp + "-" + JobID.generate();
}
public static String toShuffleId(JobID jobID, IntermediateDataSetID
dataSetID) {