This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7825830484 [Bug][flink-runtime][connectors-v2] Flink register table
Environment The running mode is set to`job.mode` (#4826)
7825830484 is described below
commit 7825830484662e7f96dbf682d639a27bb3acd98b
Author: ZhilinLi <[email protected]>
AuthorDate: Mon Aug 14 15:54:56 2023 +0800
[Bug][flink-runtime][connectors-v2] Flink register table Environment The
running mode is set to`job.mode` (#4826)
---
.../seatunnel/core/starter/flink/execution/FlinkExecution.java | 8 ++++++++
.../core/starter/flink/execution/SourceExecuteProcessor.java | 6 ++++--
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index a3282cc4a1..5a4050d884 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
@@ -33,6 +34,7 @@ import
org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.execution.TaskExecution;
import org.apache.seatunnel.core.starter.flink.FlinkStarter;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
@@ -111,6 +113,12 @@ public class FlinkExecution implements TaskExecution {
"Flink Execution Plan: {}",
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
+ if (!flinkRuntimeEnvironment.isStreaming()) {
+ flinkRuntimeEnvironment
+ .getStreamExecutionEnvironment()
+ .setRuntimeMode(RuntimeExecutionMode.BATCH);
+ log.info("Flink job Mode: {}", JobMode.BATCH);
+ }
try {
flinkRuntimeEnvironment
.getStreamExecutionEnvironment()
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index a3897a526e..6bcc5fe893 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -71,13 +71,15 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
} else {
sourceFunction = new SeaTunnelParallelSource(internalSource);
}
+ boolean bounded =
+ internalSource.getBoundedness()
+ ==
org.apache.seatunnel.api.source.Boundedness.BOUNDED;
DataStreamSource<Row> sourceStream =
addSource(
executionEnvironment,
sourceFunction,
"SeaTunnel " +
internalSource.getClass().getSimpleName(),
- internalSource.getBoundedness()
- ==
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
+ bounded);
Config pluginConfig = pluginConfigs.get(i);
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());