This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new 0ce3d755da [INLONG-9822][Manager] Support flink job runtime execution
mode configuration (#9823)
0ce3d755da is described below
commit 0ce3d755da0c4f000b439894366bae9abdd5d514
Author: AloysZhang <[email protected]>
AuthorDate: Fri Mar 15 15:12:53 2024 +0800
[INLONG-9822][Manager] Support flink job runtime execution mode
configuration (#9823)
---
.../inlong/manager/common/consts/InlongConstants.java | 3 +++
.../apache/inlong/manager/plugin/flink/FlinkService.java | 2 ++
.../apache/inlong/manager/plugin/flink/dto/FlinkInfo.java | 2 ++
.../inlong/manager/plugin/listener/DeleteSortListener.java | 1 +
.../manager/plugin/listener/RestartSortListener.java | 8 +++++++-
.../manager/plugin/listener/StartupSortListener.java | 14 ++++++++++----
.../org/apache/inlong/sort/configuration/Constants.java | 8 ++++++++
.../src/main/java/org/apache/inlong/sort/Entrance.java | 11 ++++++++++-
8 files changed, 43 insertions(+), 6 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 581ebb3098..e84dcc6602 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -90,6 +90,9 @@ public class InlongConstants {
public static final Integer DATASYNC_REALTIME_MODE = 1;
public static final Integer DATASYNC_OFFLINE_MODE = 2;
+ public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming";
+ public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
+
public static final Integer DISABLE_ZK = 0;
public static final Integer ENABLE_ZK = 1;
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 69a32b73fa..33a7ca59cd 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -258,6 +258,8 @@ public class FlinkService {
list.add(flinkInfo.getLocalConfPath());
list.add("-checkpoint.interval");
list.add("60000");
+ list.add("-runtime.execution.mode");
+ list.add(flinkInfo.getRuntimeExecutionMode());
list.add("-metrics.audit.proxy.hosts");
list.add(flinkConfig.getAuditProxyHosts());
return list.toArray(new String[0]);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index f7071ceb4b..4c3c75f855 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -52,4 +52,6 @@ public class FlinkInfo {
private boolean isException = false;
private String exceptionMsg;
+
+ private String runtimeExecutionMode;
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 009374e2a6..5e30ad8cb5 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -119,6 +119,7 @@ public class DeleteSortListener implements
SortOperateListener {
FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.delete(flinkInfo);
+ // TODO if the job is OFFLINE, should delete the scheduler
information
log.info("job delete success for jobId={}", jobId);
} catch (Exception e) {
flinkInfo.setException(true);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 0c6828c984..242138c1e4 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -86,7 +86,13 @@ public class RestartSortListener implements
SortOperateListener {
}
GroupResourceProcessForm groupResourceProcessForm =
(GroupResourceProcessForm) processForm;
-
+ if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(
+ groupResourceProcessForm.getGroupInfo().getInlongGroupMode()))
{
+ String message = String.format("offline data sync job should be
scheduled by the "
+ + "scheduler system only for groupId [%s]", groupId);
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
List<InlongStreamInfo> streamInfos =
groupResourceProcessForm.getStreamInfos();
for (InlongStreamInfo streamInfo : streamInfos) {
List<StreamSink> sinkList = streamInfo.getSinkList();
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index fae5faa278..f6b3b6061a 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -131,20 +131,26 @@ public class StartupSortListener implements
SortOperateListener {
return ListenerResult.fail(message);
}
+ boolean isRealTimeSync = InlongConstants.DATASYNC_REALTIME_MODE
+
.equals(groupResourceForm.getGroupInfo().getInlongGroupMode());
+
FlinkInfo flinkInfo = new FlinkInfo();
String jobName =
Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN
+ streamInfo.getInlongStreamId();
flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
+ flinkInfo.setEndpoint(kvConf.get(InlongConstants.SORT_URL));
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+ if (isRealTimeSync) {
+
flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING);
+ } else {
+
flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH);
+ }
FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.genPath(flinkInfo, dataflow);
// only start job for real-time mode
- if (InlongConstants.DATASYNC_REALTIME_MODE
-
.equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) {
+ if (isRealTimeSync) {
flinkOperation.start(flinkInfo);
log.info("job submit success for groupId = {}, streamId =
{}, jobId = {}", groupId,
streamInfo.getInlongStreamId(),
flinkInfo.getJobId());
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 702dbef53f..a5717242a5 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -291,6 +291,14 @@ public class Constants {
public static final ConfigOption<Integer> CHECKPOINT_TIMEOUT_MS =
key("checkpoint.timeout.ms").defaultValue(600000);
+ // ------------------------------------------------------------------------
+ // Flink runtime execution mode, including stream and batch, default is
stream
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<String> RUNTIME_EXECUTION_MODE =
key("runtime.execution.mode")
+ .defaultValue("stream")
+ .withDescription("The runtime execution mode of Flink, including
stream and batch, default is stream");
+
// ------------------------------------------------------------------------
// Metrics related
// ------------------------------------------------------------------------
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index ea3f3f95be..4aa3902cae 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -40,6 +40,8 @@ import java.nio.charset.StandardCharsets;
public class Entrance {
+ public static final String BATCH_MODE = "batch";
+
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
final Configuration config = parameterTool.getConfiguration();
@@ -50,7 +52,14 @@ public class Entrance {
config.getInteger(Constants.MIN_PAUSE_BETWEEN_CHECKPOINTS_MS));
env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
+
+ String runtimeExecutionMode =
config.getString(Constants.RUNTIME_EXECUTION_MODE);
+ EnvironmentSettings settings;
+ if (BATCH_MODE.equalsIgnoreCase(runtimeExecutionMode)) {
+ settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ } else {
+ settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
+ }
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME,
config.getString(Constants.JOB_NAME));