This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3c1aeb3785 [Chore] remove useless interface (#6746)
3c1aeb3785 is described below

commit 3c1aeb37859d625e77adbb3b5156552f758470a5
Author: Tyrantlucifer <[email protected]>
AuthorDate: Sat Apr 27 16:58:29 2024 +0800

    [Chore] remove useless interface (#6746)
---
 .../cdc/base/source/IncrementalSource.java         |  3 +-
 .../flink/execution/FlinkRuntimeEnvironment.java   | 16 ++---------
 .../FlinkAbstractPluginExecuteProcessor.java       | 33 +++-------------------
 .../flink/execution/FlinkRuntimeEnvironment.java   | 16 ++---------
 .../flink/execution/SourceExecuteProcessor.java    |  4 ---
 .../core/starter/flink/utils/TableUtil.java        |  5 +---
 6 files changed, 12 insertions(+), 65 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 087dc17776..003ff64e8f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -79,7 +78,7 @@ import java.util.stream.Stream;
 
 @NoArgsConstructor
 public abstract class IncrementalSource<T, C extends SourceConfig>
-        implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>, 
SupportCoordinate {
+        implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
 
     protected ReadonlyConfig readonlyConfig;
     protected SourceConfig.Factory<C> configFactory;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index e6286b2b9e..964d8825e1 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -331,22 +331,12 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         }
     }
 
-    public void registerResultTable(
-            Config config, DataStream<Row> dataStream, String name, Boolean 
isAppend) {
+    public void registerResultTable(Config config, DataStream<Row> dataStream, 
String name) {
         StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
         if (!TableUtil.tableExists(tableEnvironment, name)) {
-            if (isAppend) {
-                if (config.hasPath("field_name")) {
-                    String fieldName = config.getString("field_name");
-                    tableEnvironment.registerDataStream(name, dataStream, 
fieldName);
-                    return;
-                }
-                tableEnvironment.registerDataStream(name, dataStream);
-                return;
-            }
+            tableEnvironment.createTemporaryView(
+                    name, tableEnvironment.fromChangelogStream(dataStream));
         }
-        tableEnvironment.createTemporaryView(
-                name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
     public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 6b72ed3a42..565b7379bf 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -32,7 +32,6 @@ import org.apache.flink.types.Row;
 
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.BiConsumer;
@@ -41,10 +40,10 @@ import static 
org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
 
 public abstract class FlinkAbstractPluginExecuteProcessor<T>
         implements PluginExecuteProcessor<DataStreamTableInfo, 
FlinkRuntimeEnvironment> {
+
     protected static final String ENGINE_TYPE = "seatunnel";
-    protected static final String PLUGIN_NAME_KEY = "plugin_name";
+
     protected static final String SOURCE_TABLE_NAME = "source_table_name";
-    protected static HashMap<String, Boolean> IS_APPEND_STREAM_MAP = new 
HashMap<>();
 
     protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER 
=
             (classLoader, url) -> {
@@ -100,10 +99,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
                                                             "table %s not 
found", tableName)));
             return Optional.of(
                     new DataStreamTableInfo(
-                            TableUtil.tableToDataStream(
-                                    tableEnvironment,
-                                    table,
-                                    
IS_APPEND_STREAM_MAP.getOrDefault(tableName, true)),
+                            TableUtil.tableToDataStream(tableEnvironment, 
table),
                             dataStreamTableInfo.getCatalogTable(),
                             tableName));
         }
@@ -113,28 +109,7 @@ public abstract class 
FlinkAbstractPluginExecuteProcessor<T>
     protected void registerResultTable(Config pluginConfig, DataStream<Row> 
dataStream) {
         if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
             String resultTable = 
pluginConfig.getString(RESULT_TABLE_NAME.key());
-            if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
-                String sourceTable = pluginConfig.getString(SOURCE_TABLE_NAME);
-                flinkRuntimeEnvironment.registerResultTable(
-                        pluginConfig,
-                        dataStream,
-                        resultTable,
-                        IS_APPEND_STREAM_MAP.getOrDefault(sourceTable, true));
-                registerAppendStream(pluginConfig);
-                return;
-            }
-            flinkRuntimeEnvironment.registerResultTable(
-                    pluginConfig,
-                    dataStream,
-                    resultTable,
-                    IS_APPEND_STREAM_MAP.getOrDefault(resultTable, true));
-        }
-    }
-
-    protected void registerAppendStream(Config pluginConfig) {
-        if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
-            String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
-            IS_APPEND_STREAM_MAP.put(tableName, false);
+            flinkRuntimeEnvironment.registerResultTable(pluginConfig, 
dataStream, resultTable);
         }
     }
 
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index d5be751ed3..09640d222e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -333,22 +333,12 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         }
     }
 
-    public void registerResultTable(
-            Config config, DataStream<Row> dataStream, String name, Boolean 
isAppend) {
+    public void registerResultTable(Config config, DataStream<Row> dataStream, 
String name) {
         StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
         if (!TableUtil.tableExists(tableEnvironment, name)) {
-            if (isAppend) {
-                if (config.hasPath("field_name")) {
-                    String fieldName = config.getString("field_name");
-                    tableEnvironment.registerDataStream(name, dataStream, 
fieldName);
-                    return;
-                }
-                tableEnvironment.registerDataStream(name, dataStream);
-                return;
-            }
+            tableEnvironment.createTemporaryView(
+                    name, tableEnvironment.fromChangelogStream(dataStream));
         }
-        tableEnvironment.createTemporaryView(
-                name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
     public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index d017b1e204..20b74f4b71 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -70,9 +69,6 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
             SourceTableInfo sourceTableInfo = plugins.get(i);
             SeaTunnelSource internalSource = sourceTableInfo.getSource();
             Config pluginConfig = pluginConfigs.get(i);
-            if (internalSource instanceof SupportCoordinate) {
-                registerAppendStream(pluginConfig);
-            }
             FlinkSource flinkSource = new FlinkSource<>(internalSource, 
envConfig);
 
             DataStreamSource sourceStream =
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
index aad97518f4..fc0ea66934 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
@@ -31,12 +31,9 @@ public final class TableUtil {
     private TableUtil() {}
 
     public static DataStream<Row> tableToDataStream(
-            StreamTableEnvironment tableEnvironment, Table table, boolean 
isAppend) {
+            StreamTableEnvironment tableEnvironment, Table table) {
 
         TypeInformation<Row> typeInfo = table.getSchema().toRowType();
-        if (isAppend) {
-            return tableEnvironment.toAppendStream(table, typeInfo);
-        }
         DataStream<Row> dataStream = tableEnvironment.toChangelogStream(table);
         dataStream.getTransformation().setOutputType(typeInfo);
         return dataStream;

Reply via email to