This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4341c7cdb7 [Improve][Flink] Remove useless stageType. (#5650)
4341c7cdb7 is described below

commit 4341c7cdb79b71f98ecb1014a11a5933c28311ec
Author: Chengyu Yan <[email protected]>
AuthorDate: Fri Oct 20 13:12:01 2023 +0800

    [Improve][Flink] Remove useless stageType. (#5650)
---
 release-note.md                                    |  1 +
 .../flink/execution/FlinkRuntimeEnvironment.java   | 24 ----------------
 .../flink/execution/SinkExecuteProcessor.java      |  2 +-
 .../FlinkAbstractPluginExecuteProcessor.java       | 32 ----------------------
 .../flink/execution/FlinkRuntimeEnvironment.java   | 25 -----------------
 .../flink/execution/SinkExecuteProcessor.java      |  2 +-
 .../flink/execution/SourceExecuteProcessor.java    |  2 --
 .../flink/execution/TransformExecuteProcessor.java |  3 +-
 8 files changed, 4 insertions(+), 87 deletions(-)

diff --git a/release-note.md b/release-note.md
index bfb7d03856..545039062b 100644
--- a/release-note.md
+++ b/release-note.md
@@ -97,6 +97,7 @@
 - [Core] [Starter] Optimize code structure & remove redundant code (#4525)
 - [Core] [Translation] [Flink] Optimize code structure & remove redundant code 
(#4527)
 - [Core] [Starter] Add check of sink and source config to avoid null pointer 
exception. (#4734)
+- [Core] [Flink] Remove useless stage type related codes. (#5650)
 
 ### Connector-V2
 
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 4bd81769fb..e4cc21a305 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -52,11 +51,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -68,8 +64,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
     private StreamExecutionEnvironment environment;
 
     private StreamTableEnvironment tableEnvironment;
-    private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
-    private Optional<SeaTunnelRowType> defaultType = Optional.empty();
     private JobMode jobMode;
 
     private String jobName = Constants.LOGO;
@@ -339,24 +333,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
                 name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
-    public void stageType(String tblName, SeaTunnelRowType type) {
-        stagedTypes.put(tblName, type);
-    }
-
-    public void stageDefaultType(SeaTunnelRowType type) {
-        this.defaultType = Optional.of(type);
-    }
-
-    public Optional<SeaTunnelRowType> type(String tblName) {
-        return stagedTypes.containsKey(tblName)
-                ? Optional.of(stagedTypes.get(tblName))
-                : Optional.empty();
-    }
-
-    public Optional<SeaTunnelRowType> defaultType() {
-        return this.defaultType;
-    }
-
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 8ef78a9762..86caa6939a 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -101,7 +101,7 @@ public class SinkExecuteProcessor
                                         
sinkConfig.getString(PLUGIN_NAME.key())),
                                 sinkConfig);
                 sink.setJobContext(jobContext);
-                SeaTunnelRowType sourceType = initSourceType(sinkConfig, 
stream.getDataStream());
+                SeaTunnelRowType sourceType = 
stream.getCatalogTable().getSeaTunnelRowType();
                 sink.setTypeInfo(sourceType);
             } else {
                 TableSinkFactoryContext context =
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 44b2f914fc..683ff77449 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -20,12 +20,10 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
 import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
@@ -135,36 +133,6 @@ public abstract class 
FlinkAbstractPluginExecuteProcessor<T>
         }
     }
 
-    protected void stageType(Config pluginConfig, SeaTunnelRowType type) {
-        if (!flinkRuntimeEnvironment.defaultType().isPresent()) {
-            flinkRuntimeEnvironment.stageDefaultType(type);
-        }
-
-        if (pluginConfig.hasPath("result_table_name")) {
-            String tblName = pluginConfig.getString("result_table_name");
-            flinkRuntimeEnvironment.stageType(tblName, type);
-        }
-    }
-
-    protected Optional<SeaTunnelRowType> sourceType(Config pluginConfig) {
-        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
-            String tblName = pluginConfig.getString(SOURCE_TABLE_NAME);
-            return flinkRuntimeEnvironment.type(tblName);
-        } else {
-            return flinkRuntimeEnvironment.defaultType();
-        }
-    }
-
-    protected SeaTunnelRowType initSourceType(Config sinkConfig, 
DataStream<Row> stream) {
-        SeaTunnelRowType sourceType =
-                sourceType(sinkConfig)
-                        .orElseGet(
-                                () ->
-                                        (SeaTunnelRowType)
-                                                
TypeConverterUtils.convert(stream.getType()));
-        return sourceType;
-    }
-
     protected abstract List<T> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs);
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index d8ff813f12..12168921d8 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -52,11 +51,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -69,9 +65,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
 
     private StreamTableEnvironment tableEnvironment;
 
-    private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
-    private Optional<SeaTunnelRowType> defaultType = Optional.empty();
-
     private JobMode jobMode;
 
     private String jobName = Constants.LOGO;
@@ -341,24 +334,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
                 name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
-    public void stageType(String tblName, SeaTunnelRowType type) {
-        stagedTypes.put(tblName, type);
-    }
-
-    public void stageDefaultType(SeaTunnelRowType type) {
-        this.defaultType = Optional.of(type);
-    }
-
-    public Optional<SeaTunnelRowType> type(String tblName) {
-        return stagedTypes.containsKey(tblName)
-                ? Optional.of(stagedTypes.get(tblName))
-                : Optional.empty();
-    }
-
-    public Optional<SeaTunnelRowType> defaultType() {
-        return this.defaultType;
-    }
-
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 2e6b742d67..2109ffe88f 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -102,7 +102,7 @@ public class SinkExecuteProcessor
                                         
sinkConfig.getString(PLUGIN_NAME.key())),
                                 sinkConfig);
                 sink.setJobContext(jobContext);
-                SeaTunnelRowType sourceType = initSourceType(sinkConfig, 
stream.getDataStream());
+                SeaTunnelRowType sourceType = 
stream.getCatalogTable().getSeaTunnelRowType();
                 sink.setTypeInfo(sourceType);
             } else {
                 TableSinkFactoryContext context =
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 5636d231c5..ebe1c9427a 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -95,7 +94,6 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                             sourceFunction,
                             "SeaTunnel " + 
internalSource.getClass().getSimpleName(),
                             bounded);
-            stageType(pluginConfig, (SeaTunnelRowType) 
internalSource.getProducedType());
 
             if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 810f6ba32b..6c145418b4 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -91,11 +91,10 @@ public class TransformExecuteProcessor
                 
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
                 SeaTunnelTransform transform = 
factory.createTransform(context).createTransform();
 
-                SeaTunnelRowType sourceType = initSourceType(pluginConfig, 
stream.getDataStream());
+                SeaTunnelRowType sourceType = 
stream.getCatalogTable().getSeaTunnelRowType();
                 transform.setJobContext(jobContext);
                 DataStream<Row> inputStream =
                         flinkTransform(sourceType, transform, 
stream.getDataStream());
-                stageType(pluginConfig, 
transform.getProducedCatalogTable().getSeaTunnelRowType());
                 registerResultTable(pluginConfig, inputStream);
                 upstreamDataStreams.add(
                         new DataStreamTableInfo(

Reply via email to