This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7825830484 [Bug][flink-runtime][connectors-v2] Flink register table 
Environment  The running mode is set to`job.mode` (#4826)
7825830484 is described below

commit 7825830484662e7f96dbf682d639a27bb3acd98b
Author: ZhilinLi <[email protected]>
AuthorDate: Mon Aug 14 15:54:56 2023 +0800

    [Bug][flink-runtime][connectors-v2] Flink register table Environment  The 
running mode is set to`job.mode` (#4826)
---
 .../seatunnel/core/starter/flink/execution/FlinkExecution.java    | 8 ++++++++
 .../core/starter/flink/execution/SourceExecuteProcessor.java      | 6 ++++--
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index a3282cc4a1..5a4050d884 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
@@ -33,6 +34,7 @@ import 
org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 import org.apache.seatunnel.core.starter.execution.TaskExecution;
 import org.apache.seatunnel.core.starter.flink.FlinkStarter;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
 
@@ -111,6 +113,12 @@ public class FlinkExecution implements TaskExecution {
                 "Flink Execution Plan: {}",
                 
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
         log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
+        if (!flinkRuntimeEnvironment.isStreaming()) {
+            flinkRuntimeEnvironment
+                    .getStreamExecutionEnvironment()
+                    .setRuntimeMode(RuntimeExecutionMode.BATCH);
+            log.info("Flink job Mode: {}", JobMode.BATCH);
+        }
         try {
             flinkRuntimeEnvironment
                     .getStreamExecutionEnvironment()
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index a3897a526e..6bcc5fe893 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -71,13 +71,15 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
             } else {
                 sourceFunction = new SeaTunnelParallelSource(internalSource);
             }
+            boolean bounded =
+                    internalSource.getBoundedness()
+                            == 
org.apache.seatunnel.api.source.Boundedness.BOUNDED;
             DataStreamSource<Row> sourceStream =
                     addSource(
                             executionEnvironment,
                             sourceFunction,
                             "SeaTunnel " + 
internalSource.getClass().getSimpleName(),
-                            internalSource.getBoundedness()
-                                    == 
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
+                            bounded);
             Config pluginConfig = pluginConfigs.get(i);
             if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());

Reply via email to