This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e378831ee [Feature][Connector-V2][CDC] Support flink running cdc job 
(#4918)
5e378831ee is described below

commit 5e378831ee16414619e0db0be0776ef305512de8
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Sep 12 13:39:43 2023 +0800

    [Feature][Connector-V2][CDC] Support flink running cdc job (#4918)
    
    * [feature][Connector-V2][cdc] Support flink running cdc job
    
    * [feature][Connector-V2][cdc] Support flink running cdc job
    
    * [feature][Connector-V2][cdc] mongo e2e delete
    
    ---------
    
    Co-authored-by: zhouyao <[email protected]>
---
 .../cdc/base/source/IncrementalSource.java         |  3 +-
 .../flink/execution/FlinkRuntimeEnvironment.java   | 17 ++++++----
 .../FlinkAbstractPluginExecuteProcessor.java       | 37 ++++++++++++++++++++--
 .../flink/execution/FlinkRuntimeEnvironment.java   | 17 ++++++----
 .../flink/execution/SourceExecuteProcessor.java    |  3 +-
 .../core/starter/flink/utils/TableUtil.java        |  8 ++---
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java |  8 ++++-
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    |  2 +-
 8 files changed, 69 insertions(+), 26 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index c10ab3e061..ed04fb0f5d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
@@ -76,7 +77,7 @@ import java.util.stream.Stream;
 
 @NoArgsConstructor
 public abstract class IncrementalSource<T, C extends SourceConfig>
-        implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
+        implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>, 
SupportCoordinate {
 
     protected ReadonlyConfig readonlyConfig;
     protected SourceConfig.Factory<C> configFactory;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 34aa7ee4f2..996c9698fb 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -316,19 +316,22 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         }
     }
 
-    public void registerResultTable(Config config, DataStream<Row> dataStream) 
{
-        if (config.hasPath(RESULT_TABLE_NAME)) {
-            String name = config.getString(RESULT_TABLE_NAME);
-            StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
-            if (!TableUtil.tableExists(tableEnvironment, name)) {
+    public void registerResultTable(
+            Config config, DataStream<Row> dataStream, String name, Boolean 
isAppend) {
+        StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
+        if (!TableUtil.tableExists(tableEnvironment, name)) {
+            if (isAppend) {
                 if (config.hasPath("field_name")) {
                     String fieldName = config.getString("field_name");
                     tableEnvironment.registerDataStream(name, dataStream, 
fieldName);
-                } else {
-                    tableEnvironment.registerDataStream(name, dataStream);
+                    return;
                 }
+                tableEnvironment.registerDataStream(name, dataStream);
+                return;
             }
         }
+        tableEnvironment.createTemporaryView(
+                name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
     public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index e9d36ba068..6c61f61b95 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -31,15 +31,19 @@ import org.apache.flink.types.Row;
 
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.BiConsumer;
 
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
 public abstract class FlinkAbstractPluginExecuteProcessor<T>
         implements PluginExecuteProcessor<DataStream<Row>, 
FlinkRuntimeEnvironment> {
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME = "plugin_name";
     protected static final String SOURCE_TABLE_NAME = "source_table_name";
+    protected static HashMap<String, Boolean> isAppendMap = new HashMap<>();
 
     protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER 
=
             (classLoader, url) -> {
@@ -76,14 +80,41 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
         if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             StreamTableEnvironment tableEnvironment =
                     flinkRuntimeEnvironment.getStreamTableEnvironment();
-            Table table = 
tableEnvironment.from(pluginConfig.getString(SOURCE_TABLE_NAME));
-            return 
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
+            String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
+            Table table = tableEnvironment.from(tableName);
+            return Optional.ofNullable(
+                    TableUtil.tableToDataStream(
+                            tableEnvironment, table, 
isAppendMap.getOrDefault(tableName, true)));
         }
         return Optional.empty();
     }
 
     protected void registerResultTable(Config pluginConfig, DataStream<Row> 
dataStream) {
-        flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream);
+        if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
+            String resultTable = 
pluginConfig.getString(RESULT_TABLE_NAME.key());
+            if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+                String sourceTable = pluginConfig.getString(SOURCE_TABLE_NAME);
+                flinkRuntimeEnvironment.registerResultTable(
+                        pluginConfig,
+                        dataStream,
+                        resultTable,
+                        isAppendMap.getOrDefault(sourceTable, true));
+                registerAppendStream(pluginConfig);
+                return;
+            }
+            flinkRuntimeEnvironment.registerResultTable(
+                    pluginConfig,
+                    dataStream,
+                    resultTable,
+                    isAppendMap.getOrDefault(resultTable, true));
+        }
+    }
+
+    protected void registerAppendStream(Config pluginConfig) {
+        if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
+            String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
+            isAppendMap.put(tableName, false);
+        }
     }
 
     protected abstract List<T> initializePlugins(
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 583a1cf3e5..12168921d8 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -316,19 +316,22 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         }
     }
 
-    public void registerResultTable(Config config, DataStream<Row> dataStream) 
{
-        if (config.hasPath(RESULT_TABLE_NAME)) {
-            String name = config.getString(RESULT_TABLE_NAME);
-            StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
-            if (!TableUtil.tableExists(tableEnvironment, name)) {
+    public void registerResultTable(
+            Config config, DataStream<Row> dataStream, String name, Boolean 
isAppend) {
+        StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
+        if (!TableUtil.tableExists(tableEnvironment, name)) {
+            if (isAppend) {
                 if (config.hasPath("field_name")) {
                     String fieldName = config.getString("field_name");
                     tableEnvironment.registerDataStream(name, dataStream, 
fieldName);
-                } else {
-                    tableEnvironment.registerDataStream(name, dataStream);
+                    return;
                 }
+                tableEnvironment.registerDataStream(name, dataStream);
+                return;
             }
         }
+        tableEnvironment.createTemporaryView(
+                name, tableEnvironment.fromChangelogStream(dataStream));
     }
 
     public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 6bcc5fe893..f3ebdd0437 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -65,9 +65,11 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
         List<DataStream<Row>> sources = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
             SeaTunnelSource internalSource = plugins.get(i);
+            Config pluginConfig = pluginConfigs.get(i);
             BaseSeaTunnelSourceFunction sourceFunction;
             if (internalSource instanceof SupportCoordinate) {
                 sourceFunction = new 
SeaTunnelCoordinatedSource(internalSource);
+                registerAppendStream(pluginConfig);
             } else {
                 sourceFunction = new SeaTunnelParallelSource(internalSource);
             }
@@ -80,7 +82,6 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                             sourceFunction,
                             "SeaTunnel " + 
internalSource.getClass().getSimpleName(),
                             bounded);
-            Config pluginConfig = pluginConfigs.get(i);
             if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
                 sourceStream.setParallelism(parallelism);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
index ca1603cdf9..aad97518f4 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
@@ -37,11 +37,9 @@ public final class TableUtil {
         if (isAppend) {
             return tableEnvironment.toAppendStream(table, typeInfo);
         }
-        return tableEnvironment
-                .toRetractStream(table, typeInfo)
-                .filter(row -> row.f0)
-                .map(row -> row.f1)
-                .returns(typeInfo);
+        DataStream<Row> dataStream = tableEnvironment.toChangelogStream(table);
+        dataStream.getTransformation().setOutputType(typeInfo);
+        return dataStream;
     }
 
     public static boolean tableExists(TableEnvironment tableEnvironment, 
String name) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 1d0d90853f..b648febd7d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -55,7 +55,7 @@ import static org.awaitility.Awaitility.await;
 @Slf4j
 @DisabledOnContainer(
         value = {},
-        type = {EngineType.SPARK, EngineType.FLINK},
+        type = {EngineType.SPARK},
         disabledReason = "Currently SPARK and FLINK do not support cdc")
 public class MysqlCDCIT extends TestSuiteBase implements TestResource {
 
@@ -88,6 +88,9 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                     + " f_enum, cast(f_mediumblob as char) as f_mediumblob, 
f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
                     + " f_json, cast(f_year as year) from 
mysql_cdc_e2e_sink_table";
 
+    private static final String CLEAN_SOURCE = "truncate table 
mysql_cdc_e2e_source_table";
+    private static final String CLEAN_SINK = "truncate table 
mysql_cdc_e2e_sink_table";
+
     private static MySqlContainer createMySqlContainer(MySqlVersion version) {
         MySqlContainer mySqlContainer =
                 new MySqlContainer(version)
@@ -134,6 +137,9 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     public void testMysqlCdcCheckDataE2e(TestContainer container)
             throws IOException, InterruptedException {
+        // Clear related content to ensure that multiple operations are not 
affected
+        executeSql(CLEAN_SOURCE);
+        executeSql(CLEAN_SINK);
 
         CompletableFuture<Void> executeJobFuture =
                 CompletableFuture.supplyAsync(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 8bca3e3b03..bfe2a35888 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -65,7 +65,7 @@ import static org.awaitility.Awaitility.await;
 @Slf4j
 @DisabledOnContainer(
         value = {},
-        type = {EngineType.SPARK, EngineType.FLINK},
+        type = {EngineType.SPARK},
         disabledReason = "Currently SPARK and FLINK do not support cdc")
 public class SqlServerCDCIT extends TestSuiteBase implements TestResource {
 

Reply via email to