This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e378831ee [Feature][Connector-V2][CDC] Support flink running cdc job
(#4918)
5e378831ee is described below
commit 5e378831ee16414619e0db0be0776ef305512de8
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Sep 12 13:39:43 2023 +0800
[Feature][Connector-V2][CDC] Support flink running cdc job (#4918)
* [feature][Connector-V2][cdc] Support flink running cdc job
* [feature][Connector-V2][cdc] Support flink running cdc job
* [feature][Connector-V2][cdc] mongo e2e delete
---------
Co-authored-by: zhouyao <[email protected]>
---
.../cdc/base/source/IncrementalSource.java | 3 +-
.../flink/execution/FlinkRuntimeEnvironment.java | 17 ++++++----
.../FlinkAbstractPluginExecuteProcessor.java | 37 ++++++++++++++++++++--
.../flink/execution/FlinkRuntimeEnvironment.java | 17 ++++++----
.../flink/execution/SourceExecuteProcessor.java | 3 +-
.../core/starter/flink/utils/TableUtil.java | 8 ++---
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 8 ++++-
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 2 +-
8 files changed, 69 insertions(+), 26 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index c10ab3e061..ed04fb0f5d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
@@ -76,7 +77,7 @@ import java.util.stream.Stream;
@NoArgsConstructor
public abstract class IncrementalSource<T, C extends SourceConfig>
- implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
+ implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>,
SupportCoordinate {
protected ReadonlyConfig readonlyConfig;
protected SourceConfig.Factory<C> configFactory;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 34aa7ee4f2..996c9698fb 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -316,19 +316,22 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
}
- public void registerResultTable(Config config, DataStream<Row> dataStream)
{
- if (config.hasPath(RESULT_TABLE_NAME)) {
- String name = config.getString(RESULT_TABLE_NAME);
- StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
- if (!TableUtil.tableExists(tableEnvironment, name)) {
+ public void registerResultTable(
+ Config config, DataStream<Row> dataStream, String name, Boolean
isAppend) {
+ StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
+ if (!TableUtil.tableExists(tableEnvironment, name)) {
+ if (isAppend) {
if (config.hasPath("field_name")) {
String fieldName = config.getString("field_name");
tableEnvironment.registerDataStream(name, dataStream,
fieldName);
- } else {
- tableEnvironment.registerDataStream(name, dataStream);
+ return;
}
+ tableEnvironment.registerDataStream(name, dataStream);
+ return;
}
}
+ tableEnvironment.createTemporaryView(
+ name, tableEnvironment.fromChangelogStream(dataStream));
}
public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index e9d36ba068..6c61f61b95 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -31,15 +31,19 @@ import org.apache.flink.types.Row;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStream<Row>,
FlinkRuntimeEnvironment> {
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
protected static final String SOURCE_TABLE_NAME = "source_table_name";
+ protected static HashMap<String, Boolean> isAppendMap = new HashMap<>();
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER
=
(classLoader, url) -> {
@@ -76,14 +80,41 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
StreamTableEnvironment tableEnvironment =
flinkRuntimeEnvironment.getStreamTableEnvironment();
- Table table =
tableEnvironment.from(pluginConfig.getString(SOURCE_TABLE_NAME));
- return
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
+ String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
+ Table table = tableEnvironment.from(tableName);
+ return Optional.ofNullable(
+ TableUtil.tableToDataStream(
+ tableEnvironment, table,
isAppendMap.getOrDefault(tableName, true)));
}
return Optional.empty();
}
protected void registerResultTable(Config pluginConfig, DataStream<Row>
dataStream) {
- flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream);
+ if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
+ String resultTable =
pluginConfig.getString(RESULT_TABLE_NAME.key());
+ if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+ String sourceTable = pluginConfig.getString(SOURCE_TABLE_NAME);
+ flinkRuntimeEnvironment.registerResultTable(
+ pluginConfig,
+ dataStream,
+ resultTable,
+ isAppendMap.getOrDefault(sourceTable, true));
+ registerAppendStream(pluginConfig);
+ return;
+ }
+ flinkRuntimeEnvironment.registerResultTable(
+ pluginConfig,
+ dataStream,
+ resultTable,
+ isAppendMap.getOrDefault(resultTable, true));
+ }
+ }
+
+ protected void registerAppendStream(Config pluginConfig) {
+ if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
+ String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
+ isAppendMap.put(tableName, false);
+ }
}
protected abstract List<T> initializePlugins(
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 583a1cf3e5..12168921d8 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -316,19 +316,22 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
}
- public void registerResultTable(Config config, DataStream<Row> dataStream)
{
- if (config.hasPath(RESULT_TABLE_NAME)) {
- String name = config.getString(RESULT_TABLE_NAME);
- StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
- if (!TableUtil.tableExists(tableEnvironment, name)) {
+ public void registerResultTable(
+ Config config, DataStream<Row> dataStream, String name, Boolean
isAppend) {
+ StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
+ if (!TableUtil.tableExists(tableEnvironment, name)) {
+ if (isAppend) {
if (config.hasPath("field_name")) {
String fieldName = config.getString("field_name");
tableEnvironment.registerDataStream(name, dataStream,
fieldName);
- } else {
- tableEnvironment.registerDataStream(name, dataStream);
+ return;
}
+ tableEnvironment.registerDataStream(name, dataStream);
+ return;
}
}
+ tableEnvironment.createTemporaryView(
+ name, tableEnvironment.fromChangelogStream(dataStream));
}
public static FlinkRuntimeEnvironment getInstance(Config config) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 6bcc5fe893..f3ebdd0437 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -65,9 +65,11 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
List<DataStream<Row>> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
SeaTunnelSource internalSource = plugins.get(i);
+ Config pluginConfig = pluginConfigs.get(i);
BaseSeaTunnelSourceFunction sourceFunction;
if (internalSource instanceof SupportCoordinate) {
sourceFunction = new
SeaTunnelCoordinatedSource(internalSource);
+ registerAppendStream(pluginConfig);
} else {
sourceFunction = new SeaTunnelParallelSource(internalSource);
}
@@ -80,7 +82,6 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
sourceFunction,
"SeaTunnel " +
internalSource.getClass().getSimpleName(),
bounded);
- Config pluginConfig = pluginConfigs.get(i);
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
index ca1603cdf9..aad97518f4 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
@@ -37,11 +37,9 @@ public final class TableUtil {
if (isAppend) {
return tableEnvironment.toAppendStream(table, typeInfo);
}
- return tableEnvironment
- .toRetractStream(table, typeInfo)
- .filter(row -> row.f0)
- .map(row -> row.f1)
- .returns(typeInfo);
+ DataStream<Row> dataStream = tableEnvironment.toChangelogStream(table);
+ dataStream.getTransformation().setOutputType(typeInfo);
+ return dataStream;
}
public static boolean tableExists(TableEnvironment tableEnvironment,
String name) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 1d0d90853f..b648febd7d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -55,7 +55,7 @@ import static org.awaitility.Awaitility.await;
@Slf4j
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
+ type = {EngineType.SPARK},
disabledReason = "Currently SPARK and FLINK do not support cdc")
public class MysqlCDCIT extends TestSuiteBase implements TestResource {
@@ -88,6 +88,9 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
+ " f_enum, cast(f_mediumblob as char) as f_mediumblob,
f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ " f_json, cast(f_year as year) from
mysql_cdc_e2e_sink_table";
+ private static final String CLEAN_SOURCE = "truncate table
mysql_cdc_e2e_source_table";
+ private static final String CLEAN_SINK = "truncate table
mysql_cdc_e2e_sink_table";
+
private static MySqlContainer createMySqlContainer(MySqlVersion version) {
MySqlContainer mySqlContainer =
new MySqlContainer(version)
@@ -134,6 +137,9 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testMysqlCdcCheckDataE2e(TestContainer container)
throws IOException, InterruptedException {
+ // Clear related content to ensure that multiple operations are not
affected
+ executeSql(CLEAN_SOURCE);
+ executeSql(CLEAN_SINK);
CompletableFuture<Void> executeJobFuture =
CompletableFuture.supplyAsync(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 8bca3e3b03..bfe2a35888 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -65,7 +65,7 @@ import static org.awaitility.Awaitility.await;
@Slf4j
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
+ type = {EngineType.SPARK},
disabledReason = "Currently SPARK and FLINK do not support cdc")
public class SqlServerCDCIT extends TestSuiteBase implements TestResource {