This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4341c7cdb7 [Improve][Flink] Remove useless stageType. (#5650)
4341c7cdb7 is described below
commit 4341c7cdb79b71f98ecb1014a11a5933c28311ec
Author: Chengyu Yan <[email protected]>
AuthorDate: Fri Oct 20 13:12:01 2023 +0800
[Improve][Flink] Remove useless stageType. (#5650)
---
release-note.md | 1 +
.../flink/execution/FlinkRuntimeEnvironment.java | 24 ----------------
.../flink/execution/SinkExecuteProcessor.java | 2 +-
.../FlinkAbstractPluginExecuteProcessor.java | 32 ----------------------
.../flink/execution/FlinkRuntimeEnvironment.java | 25 -----------------
.../flink/execution/SinkExecuteProcessor.java | 2 +-
.../flink/execution/SourceExecuteProcessor.java | 2 --
.../flink/execution/TransformExecuteProcessor.java | 3 +-
8 files changed, 4 insertions(+), 87 deletions(-)
diff --git a/release-note.md b/release-note.md
index bfb7d03856..545039062b 100644
--- a/release-note.md
+++ b/release-note.md
@@ -97,6 +97,7 @@
- [Core] [Starter] Optimize code structure & remove redundant code (#4525)
- [Core] [Translation] [Flink] Optimize code structure & remove redundant code
(#4527)
- [Core] [Starter] Add check of sink and source config to avoid null pointer
exception. (#4734)
+- [Core] [Flink] Remove useless stage type related codes. (#5650)
### Connector-V2
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 4bd81769fb..e4cc21a305 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
@@ -52,11 +51,8 @@ import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@@ -68,8 +64,6 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
private StreamExecutionEnvironment environment;
private StreamTableEnvironment tableEnvironment;
- private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
- private Optional<SeaTunnelRowType> defaultType = Optional.empty();
private JobMode jobMode;
private String jobName = Constants.LOGO;
@@ -339,24 +333,6 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
name, tableEnvironment.fromChangelogStream(dataStream));
}
- public void stageType(String tblName, SeaTunnelRowType type) {
- stagedTypes.put(tblName, type);
- }
-
- public void stageDefaultType(SeaTunnelRowType type) {
- this.defaultType = Optional.of(type);
- }
-
- public Optional<SeaTunnelRowType> type(String tblName) {
- return stagedTypes.containsKey(tblName)
- ? Optional.of(stagedTypes.get(tblName))
- : Optional.empty();
- }
-
- public Optional<SeaTunnelRowType> defaultType() {
- return this.defaultType;
- }
-
public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 8ef78a9762..86caa6939a 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -101,7 +101,7 @@ public class SinkExecuteProcessor
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
- SeaTunnelRowType sourceType = initSourceType(sinkConfig,
stream.getDataStream());
+ SeaTunnelRowType sourceType =
stream.getCatalogTable().getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
} else {
TableSinkFactoryContext context =
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 44b2f914fc..683ff77449 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -20,12 +20,10 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
@@ -135,36 +133,6 @@ public abstract class
FlinkAbstractPluginExecuteProcessor<T>
}
}
- protected void stageType(Config pluginConfig, SeaTunnelRowType type) {
- if (!flinkRuntimeEnvironment.defaultType().isPresent()) {
- flinkRuntimeEnvironment.stageDefaultType(type);
- }
-
- if (pluginConfig.hasPath("result_table_name")) {
- String tblName = pluginConfig.getString("result_table_name");
- flinkRuntimeEnvironment.stageType(tblName, type);
- }
- }
-
- protected Optional<SeaTunnelRowType> sourceType(Config pluginConfig) {
- if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
- String tblName = pluginConfig.getString(SOURCE_TABLE_NAME);
- return flinkRuntimeEnvironment.type(tblName);
- } else {
- return flinkRuntimeEnvironment.defaultType();
- }
- }
-
- protected SeaTunnelRowType initSourceType(Config sinkConfig,
DataStream<Row> stream) {
- SeaTunnelRowType sourceType =
- sourceType(sinkConfig)
- .orElseGet(
- () ->
- (SeaTunnelRowType)
-
TypeConverterUtils.convert(stream.getType()));
- return sourceType;
- }
-
protected abstract List<T> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index d8ff813f12..12168921d8 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
@@ -52,11 +51,8 @@ import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@@ -69,9 +65,6 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
private StreamTableEnvironment tableEnvironment;
- private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
- private Optional<SeaTunnelRowType> defaultType = Optional.empty();
-
private JobMode jobMode;
private String jobName = Constants.LOGO;
@@ -341,24 +334,6 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
name, tableEnvironment.fromChangelogStream(dataStream));
}
- public void stageType(String tblName, SeaTunnelRowType type) {
- stagedTypes.put(tblName, type);
- }
-
- public void stageDefaultType(SeaTunnelRowType type) {
- this.defaultType = Optional.of(type);
- }
-
- public Optional<SeaTunnelRowType> type(String tblName) {
- return stagedTypes.containsKey(tblName)
- ? Optional.of(stagedTypes.get(tblName))
- : Optional.empty();
- }
-
- public Optional<SeaTunnelRowType> defaultType() {
- return this.defaultType;
- }
-
public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 2e6b742d67..2109ffe88f 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -102,7 +102,7 @@ public class SinkExecuteProcessor
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
- SeaTunnelRowType sourceType = initSourceType(sinkConfig,
stream.getDataStream());
+ SeaTunnelRowType sourceType =
stream.getCatalogTable().getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
} else {
TableSinkFactoryContext context =
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 5636d231c5..ebe1c9427a 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -95,7 +94,6 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
sourceFunction,
"SeaTunnel " +
internalSource.getClass().getSimpleName(),
bounded);
- stageType(pluginConfig, (SeaTunnelRowType)
internalSource.getProducedType());
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 810f6ba32b..6c145418b4 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -91,11 +91,10 @@ public class TransformExecuteProcessor
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
SeaTunnelTransform transform =
factory.createTransform(context).createTransform();
- SeaTunnelRowType sourceType = initSourceType(pluginConfig,
stream.getDataStream());
+ SeaTunnelRowType sourceType =
stream.getCatalogTable().getSeaTunnelRowType();
transform.setJobContext(jobContext);
DataStream<Row> inputStream =
flinkTransform(sourceType, transform,
stream.getDataStream());
- stageType(pluginConfig,
transform.getProducedCatalogTable().getSeaTunnelRowType());
registerResultTable(pluginConfig, inputStream);
upstreamDataStreams.add(
new DataStreamTableInfo(