This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b76260127 [INLONG-6909][Manager] Rename Flink Job to avoid conflicts
(#6937)
b76260127 is described below
commit b762601276daba1cfa9e5f5b33d0591558c301ef
Author: ZuoFengZhang <[email protected]>
AuthorDate: Mon Dec 19 15:49:54 2022 +0800
[INLONG-6909][Manager] Rename Flink Job to avoid conflicts (#6937)
---
.../inlong/manager/plugin/flink/FlinkService.java | 2 ++
.../manager/plugin/flink/enums/Constants.java | 22 +++++++++++++++++++++-
.../plugin/listener/RestartSortListener.java | 2 +-
.../plugin/listener/RestartStreamListener.java | 2 +-
.../plugin/listener/StartupSortListener.java | 3 ++-
5 files changed, 27 insertions(+), 4 deletions(-)
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 2bb46ae27..b1cb70c4d 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -257,6 +257,8 @@ public class FlinkService {
List<String> list = new ArrayList<>();
list.add("-cluster-id");
list.add(flinkInfo.getJobName());
+ list.add("-job.name");
+ list.add(flinkInfo.getJobName());
list.add("-group.info.file");
list.add(flinkInfo.getLocalConfPath());
list.add("-checkpoint.interval");
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 41a3ae913..a594fd6bc 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -17,6 +17,11 @@
package org.apache.inlong.manager.plugin.flink.enums;
+import java.util.Optional;
+import java.util.function.Function;
+import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+
/**
* Constants info, including properties, dataflow info and rest api url info.
*/
@@ -46,7 +51,11 @@ public class Constants {
public static final String ENTRYPOINT_CLASS =
"org.apache.inlong.sort.Entrance";
- public static final String INLONG = "INLONG_";
+ public static final String SORT_JOB_NAME_PREFIX = "InLong-Sort-";
+
+ public static final String SORT_JOB_NAME_TEMPLATE = SORT_JOB_NAME_PREFIX +
"%s";
+
+ public static final String DEFAULT_SORT_JOB_NAME = SORT_JOB_NAME_PREFIX +
"Job";
public static final String RESOURCE_ID = "resource_id";
@@ -69,4 +78,15 @@ public class Constants {
public static final String SEPARATOR = ":";
+ /**
+ * Generate the Job name through {@link ProcessForm}: <br/>
+ * When the ProcessForm is {@link GroupResourceProcessForm}, the format of
the job name is 'InLong-Sort-{Group ID}',
+ * otherwise take the {@link Constants#DEFAULT_SORT_JOB_NAME}:
'InLong-Sort-Job'.
+ */
+ public static Function<ProcessForm, String> SORT_JOB_NAME_GENERATOR =
+ (ProcessForm processForm) -> Optional.of(processForm)
+ .map(ProcessForm::getInlongGroupId)
+ .map(groupId ->
String.format(Constants.SORT_JOB_NAME_TEMPLATE, groupId))
+ .orElse(DEFAULT_SORT_JOB_NAME);
+
}
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 83abc7d57..0f1fbe4ec 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -115,7 +115,7 @@ public class RestartSortListener implements
SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String jobName = Constants.INLONG +
context.getProcessForm().getInlongGroupId();
+ String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index dc56ab157..5ca86cc53 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -125,7 +125,7 @@ public class RestartStreamListener implements
SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String jobName = Constants.INLONG +
context.getProcessForm().getInlongGroupId();
+ String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 00eb784d2..573d28ecc 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -116,7 +116,8 @@ public class StartupSortListener implements
SortOperateListener {
}
FlinkInfo flinkInfo = new FlinkInfo();
- String jobName = Constants.INLONG +
context.getProcessForm().getInlongGroupId();
+
+ String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);