This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3c1aeb3785 [Chore] remove useless interface (#6746)
3c1aeb3785 is described below
commit 3c1aeb37859d625e77adbb3b5156552f758470a5
Author: Tyrantlucifer <[email protected]>
AuthorDate: Sat Apr 27 16:58:29 2024 +0800
[Chore] remove useless interface (#6746)
---
.../cdc/base/source/IncrementalSource.java | 3 +-
.../flink/execution/FlinkRuntimeEnvironment.java | 16 ++---------
.../FlinkAbstractPluginExecuteProcessor.java | 33 +++-------------------
.../flink/execution/FlinkRuntimeEnvironment.java | 16 ++---------
.../flink/execution/SourceExecuteProcessor.java | 4 ---
.../core/starter/flink/utils/TableUtil.java | 5 +---
6 files changed, 12 insertions(+), 65 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 087dc17776..003ff64e8f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -79,7 +78,7 @@ import java.util.stream.Stream;
@NoArgsConstructor
public abstract class IncrementalSource<T, C extends SourceConfig>
- implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>,
SupportCoordinate {
+ implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
protected ReadonlyConfig readonlyConfig;
protected SourceConfig.Factory<C> configFactory;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index e6286b2b9e..964d8825e1 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -331,22 +331,12 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
}
- public void registerResultTable(
- Config config, DataStream<Row> dataStream, String name, Boolean
isAppend) {
+ public void registerResultTable(Config config, DataStream<Row> dataStream,
String name) {
StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
- if (isAppend) {
- if (config.hasPath("field_name")) {
- String fieldName = config.getString("field_name");
- tableEnvironment.registerDataStream(name, dataStream,
fieldName);
- return;
- }
- tableEnvironment.registerDataStream(name, dataStream);
- return;
- }
+ tableEnvironment.createTemporaryView(
+ name, tableEnvironment.fromChangelogStream(dataStream));
}
- tableEnvironment.createTemporaryView(
- name, tableEnvironment.fromChangelogStream(dataStream));
}
public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 6b72ed3a42..565b7379bf 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -32,7 +32,6 @@ import org.apache.flink.types.Row;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
@@ -41,10 +40,10 @@ import static
org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment> {
+
protected static final String ENGINE_TYPE = "seatunnel";
- protected static final String PLUGIN_NAME_KEY = "plugin_name";
+
protected static final String SOURCE_TABLE_NAME = "source_table_name";
- protected static HashMap<String, Boolean> IS_APPEND_STREAM_MAP = new
HashMap<>();
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER
=
(classLoader, url) -> {
@@ -100,10 +99,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
"table %s not
found", tableName)));
return Optional.of(
new DataStreamTableInfo(
- TableUtil.tableToDataStream(
- tableEnvironment,
- table,
-
IS_APPEND_STREAM_MAP.getOrDefault(tableName, true)),
+ TableUtil.tableToDataStream(tableEnvironment,
table),
dataStreamTableInfo.getCatalogTable(),
tableName));
}
@@ -113,28 +109,7 @@ public abstract class
FlinkAbstractPluginExecuteProcessor<T>
protected void registerResultTable(Config pluginConfig, DataStream<Row>
dataStream) {
if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
String resultTable =
pluginConfig.getString(RESULT_TABLE_NAME.key());
- if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
- String sourceTable = pluginConfig.getString(SOURCE_TABLE_NAME);
- flinkRuntimeEnvironment.registerResultTable(
- pluginConfig,
- dataStream,
- resultTable,
- IS_APPEND_STREAM_MAP.getOrDefault(sourceTable, true));
- registerAppendStream(pluginConfig);
- return;
- }
- flinkRuntimeEnvironment.registerResultTable(
- pluginConfig,
- dataStream,
- resultTable,
- IS_APPEND_STREAM_MAP.getOrDefault(resultTable, true));
- }
- }
-
- protected void registerAppendStream(Config pluginConfig) {
- if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
- String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
- IS_APPEND_STREAM_MAP.put(tableName, false);
+ flinkRuntimeEnvironment.registerResultTable(pluginConfig,
dataStream, resultTable);
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index d5be751ed3..09640d222e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -333,22 +333,12 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
}
- public void registerResultTable(
- Config config, DataStream<Row> dataStream, String name, Boolean
isAppend) {
+ public void registerResultTable(Config config, DataStream<Row> dataStream,
String name) {
StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
- if (isAppend) {
- if (config.hasPath("field_name")) {
- String fieldName = config.getString("field_name");
- tableEnvironment.registerDataStream(name, dataStream,
fieldName);
- return;
- }
- tableEnvironment.registerDataStream(name, dataStream);
- return;
- }
+ tableEnvironment.createTemporaryView(
+ name, tableEnvironment.fromChangelogStream(dataStream));
}
- tableEnvironment.createTemporaryView(
- name, tableEnvironment.fromChangelogStream(dataStream));
}
public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index d017b1e204..20b74f4b71 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -70,9 +69,6 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
SourceTableInfo sourceTableInfo = plugins.get(i);
SeaTunnelSource internalSource = sourceTableInfo.getSource();
Config pluginConfig = pluginConfigs.get(i);
- if (internalSource instanceof SupportCoordinate) {
- registerAppendStream(pluginConfig);
- }
FlinkSource flinkSource = new FlinkSource<>(internalSource,
envConfig);
DataStreamSource sourceStream =
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
index aad97518f4..fc0ea66934 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
@@ -31,12 +31,9 @@ public final class TableUtil {
private TableUtil() {}
public static DataStream<Row> tableToDataStream(
- StreamTableEnvironment tableEnvironment, Table table, boolean
isAppend) {
+ StreamTableEnvironment tableEnvironment, Table table) {
TypeInformation<Row> typeInfo = table.getSchema().toRowType();
- if (isAppend) {
- return tableEnvironment.toAppendStream(table, typeInfo);
- }
DataStream<Row> dataStream = tableEnvironment.toChangelogStream(table);
dataStream.getTransformation().setOutputType(typeInfo);
return dataStream;