This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 65312ff73e [Feature][Flink] Support multiple tables read and write
(#7713)
65312ff73e is described below
commit 65312ff73ef51e7087f935bfa1d486331027c9ec
Author: Sirui-Li <[email protected]>
AuthorDate: Wed Sep 25 15:02:44 2024 +0800
[Feature][Flink] Support multiple tables read and write (#7713)
---
.github/workflows/backend.yml | 8 +-
docs/en/start-v2/locally/quick-start-flink.md | 2 +-
docs/zh/start-v2/locally/quick-start-flink.md | 2 +-
.../core/starter/execution/PluginUtil.java | 6 --
.../flink/execution/SinkExecuteProcessor.java | 106 +++++++++++++--------
.../flink/execution/SinkExecuteProcessor.java | 106 +++++++++++++--------
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 12 ++-
.../seatunnel/cdc/postgres/OpengaussCDCIT.java | 8 +-
.../seatunnel/cdc/oracle/OracleCDCIT.java | 8 +-
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 8 +-
.../connector/elasticsearch/ElasticsearchIT.java | 10 --
.../e2e/connector/email/EmailWithMultiIT.java | 6 --
.../file/local/LocalFileWithMultipleTableIT.java | 5 +-
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 5 +-
.../seatunnel/e2e/connector/http/HttpIT.java | 6 --
.../e2e/connector/influxdb/InfluxdbIT.java | 6 --
.../seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java | 6 --
.../seatunnel/jdbc/JdbcOracleMultipleTablesIT.java | 8 +-
.../seatunnel/e2e/connector/kudu/KuduIT.java | 4 -
.../connector/redis/RedisTestCaseTemplateIT.java | 6 --
.../e2e/common/container/TestContainerId.java | 2 +
.../common/container/flink/Flink17Container.java | 71 ++++++++++++++
.../common/container/flink/Flink18Container.java | 71 ++++++++++++++
.../seatunnel/engine/e2e/CheckpointEnableIT.java | 3 +-
.../translation/flink/sink/FlinkSink.java | 32 +++++--
.../translation/flink/sink/FlinkSinkWriter.java | 2 -
26 files changed, 329 insertions(+), 180 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 1bc424b297..ad4390a048 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -544,7 +544,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 45
+ timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
@@ -654,7 +654,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 90
+ timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
@@ -683,7 +683,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 90
+ timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
@@ -828,7 +828,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 90
+ timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/start-v2/locally/quick-start-flink.md
b/docs/en/start-v2/locally/quick-start-flink.md
index 1ccc5a3001..244dfd8c9e 100644
--- a/docs/en/start-v2/locally/quick-start-flink.md
+++ b/docs/en/start-v2/locally/quick-start-flink.md
@@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config
./config/v2.streaming.conf.template
```
-Flink version between `1.15.x` and `1.16.x`
+Flink version between `1.15.x` and `1.18.x`
```shell
cd "apache-seatunnel-${version}"
diff --git a/docs/zh/start-v2/locally/quick-start-flink.md
b/docs/zh/start-v2/locally/quick-start-flink.md
index 87de2808c4..d6a5a794bd 100644
--- a/docs/zh/start-v2/locally/quick-start-flink.md
+++ b/docs/zh/start-v2/locally/quick-start-flink.md
@@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config
./config/v2.streaming.conf.template
```
-Flink版本`1.15.x`到`1.16.x`
+Flink版本`1.15.x`到`1.18.x`
```shell
cd "apache-seatunnel-${version}"
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index e8a682062e..4630a4fbf4 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -102,12 +102,6 @@ public class PluginUtil {
catalogTables =
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
}
-
- // if (catalogTables.size() != 1) {
- // throw new SeaTunnelException(
- // String.format("Unsupported table number: %d on flink",
- // catalogTables.size()));
- // }
return new SourceTableInfo(source, catalogTables);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c5eabc6303..3b0a7db8e8 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -26,11 +26,15 @@ import
org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
-import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -42,9 +46,11 @@ import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscov
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.types.Row;
+
+import lombok.extern.slf4j.Slf4j;
import java.net.URL;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -54,6 +60,7 @@ import static
org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
@SuppressWarnings({"unchecked", "rawtypes"})
+@Slf4j
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
@@ -99,35 +106,48 @@ public class SinkExecuteProcessor
fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
- SeaTunnelSink sink;
+ Map<String, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
- sink =
- fallbackCreateSink(
- sinkPluginDiscovery,
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME.key())),
- sinkConfig);
- sink.setJobContext(jobContext);
- // TODO support sink multi sink
- SeaTunnelRowType sourceType =
- stream.getCatalogTables().get(0).getSeaTunnelRowType();
- sink.setTypeInfo(sourceType);
+ for (CatalogTable catalogTable : stream.getCatalogTables()) {
+ SeaTunnelSink fallBackSink =
+ fallbackCreateSink(
+ sinkPluginDiscovery,
+ PluginIdentifier.of(
+ ENGINE_TYPE,
+ PLUGIN_TYPE,
+
sinkConfig.getString(PLUGIN_NAME.key())),
+ sinkConfig);
+ fallBackSink.setJobContext(jobContext);
+ SeaTunnelRowType sourceType =
catalogTable.getSeaTunnelRowType();
+ fallBackSink.setTypeInfo(sourceType);
+ handleSaveMode(fallBackSink);
+ TableIdentifier tableId = catalogTable.getTableId();
+ String tableIdName = tableId.toTablePath().toString();
+ sinks.put(tableIdName, fallBackSink);
+ }
} else {
- // TODO support sink multi sink
- TableSinkFactoryContext context =
- TableSinkFactoryContext.replacePlaceholderAndCreate(
- stream.getCatalogTables().get(0),
- ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
- .excludeTablePlaceholderReplaceKeys());
-
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
- sink = ((TableSinkFactory)
factory.get()).createSink(context).createSink();
- sink.setJobContext(jobContext);
+ for (CatalogTable catalogTable : stream.getCatalogTables()) {
+ SeaTunnelSink seaTunnelSink;
+ TableSinkFactoryContext context =
+
TableSinkFactoryContext.replacePlaceholderAndCreate(
+ catalogTable,
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader,
+ ((TableSinkFactory) factory.get())
+
.excludeTablePlaceholderReplaceKeys());
+
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+ seaTunnelSink =
+ ((TableSinkFactory)
factory.get()).createSink(context).createSink();
+ seaTunnelSink.setJobContext(jobContext);
+ handleSaveMode(seaTunnelSink);
+ TableIdentifier tableId = catalogTable.getTableId();
+ String tableIdName = tableId.toTablePath().toString();
+ sinks.put(tableIdName, seaTunnelSink);
+ }
}
- handleSaveMode(sink);
+ SeaTunnelSink sink =
+ tryGenerateMultiTableSink(
+ sinks, ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
boolean sinkParallelism =
sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
boolean envParallelism =
envConfig.hasPath(CommonOptions.PARALLELISM.key());
int parallelism =
@@ -136,11 +156,9 @@ public class SinkExecuteProcessor
: envParallelism
?
envConfig.getInt(CommonOptions.PARALLELISM.key())
: 1;
- DataStreamSink<Row> dataStreamSink =
+ DataStreamSink<SeaTunnelRow> dataStreamSink =
stream.getDataStream()
- .sinkTo(
- new FlinkSink<>(
- sink,
stream.getCatalogTables().get(0), parallelism))
+ .sinkTo(new FlinkSink<>(sink,
stream.getCatalogTables(), parallelism))
.name(String.format("%s-Sink",
sink.getPluginName()));
dataStreamSink.setParallelism(parallelism);
}
@@ -148,6 +166,17 @@ public class SinkExecuteProcessor
return null;
}
+ // if not support multi table, rollback
+ public SeaTunnelSink tryGenerateMultiTableSink(
+ Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig,
ClassLoader classLoader) {
+ if (sinks.values().stream().anyMatch(sink -> !(sink instanceof
SupportMultiTableSink))) {
+ log.info("Unsupported multi table sink api, rollback to sink
template");
+ // choose the first sink
+ return sinks.values().iterator().next();
+ }
+ return FactoryUtil.createMultiTableSink(sinks, sinkConfig,
classLoader);
+ }
+
public boolean isFallback(Factory factory) {
try {
((TableSinkFactory) factory).createSink(null);
@@ -170,10 +199,10 @@ public class SinkExecuteProcessor
return source;
}
- public void handleSaveMode(SeaTunnelSink sink) {
- if (sink instanceof SupportSaveMode) {
- Optional<SaveModeHandler> saveModeHandler =
- ((SupportSaveMode) sink).getSaveModeHandler();
+ public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
+ if (seaTunnelSink instanceof SupportSaveMode) {
+ SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
+ Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
handler.open();
@@ -182,11 +211,6 @@ public class SinkExecuteProcessor
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
- } else if (sink instanceof MultiTableSink) {
- Map<String, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
- for (SeaTunnelSink seaTunnelSink : sinks.values()) {
- handleSaveMode(seaTunnelSink);
- }
}
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 5e6bd9173c..cc16723929 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -26,11 +26,15 @@ import
org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
-import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -43,9 +47,11 @@ import org.apache.seatunnel.translation.flink.sink.FlinkSink;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
-import org.apache.flink.types.Row;
+
+import lombok.extern.slf4j.Slf4j;
import java.net.URL;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -54,6 +60,7 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+@Slf4j
@SuppressWarnings("unchecked,rawtypes")
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
@@ -100,35 +107,48 @@ public class SinkExecuteProcessor
fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
- SeaTunnelSink sink;
+ Map<String, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
- sink =
- fallbackCreateSink(
- sinkPluginDiscovery,
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME.key())),
- sinkConfig);
- sink.setJobContext(jobContext);
- // TODO sink support multi table
- SeaTunnelRowType sourceType =
- stream.getCatalogTables().get(0).getSeaTunnelRowType();
- sink.setTypeInfo(sourceType);
+ for (CatalogTable catalogTable : stream.getCatalogTables()) {
+ SeaTunnelSink fallBackSink =
+ fallbackCreateSink(
+ sinkPluginDiscovery,
+ PluginIdentifier.of(
+ ENGINE_TYPE,
+ PLUGIN_TYPE,
+
sinkConfig.getString(PLUGIN_NAME.key())),
+ sinkConfig);
+ fallBackSink.setJobContext(jobContext);
+ SeaTunnelRowType sourceType =
catalogTable.getSeaTunnelRowType();
+ fallBackSink.setTypeInfo(sourceType);
+ handleSaveMode(fallBackSink);
+ TableIdentifier tableId = catalogTable.getTableId();
+ String tableIdName = tableId.toTablePath().toString();
+ sinks.put(tableIdName, fallBackSink);
+ }
} else {
- // TODO sink support multi table
- TableSinkFactoryContext context =
- TableSinkFactoryContext.replacePlaceholderAndCreate(
- stream.getCatalogTables().get(0),
- ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
- .excludeTablePlaceholderReplaceKeys());
-
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
- sink = ((TableSinkFactory)
factory.get()).createSink(context).createSink();
- sink.setJobContext(jobContext);
+ for (CatalogTable catalogTable : stream.getCatalogTables()) {
+ SeaTunnelSink seaTunnelSink;
+ TableSinkFactoryContext context =
+
TableSinkFactoryContext.replacePlaceholderAndCreate(
+ catalogTable,
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader,
+ ((TableSinkFactory) factory.get())
+
.excludeTablePlaceholderReplaceKeys());
+
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+ seaTunnelSink =
+ ((TableSinkFactory)
factory.get()).createSink(context).createSink();
+ seaTunnelSink.setJobContext(jobContext);
+ handleSaveMode(seaTunnelSink);
+ TableIdentifier tableId = catalogTable.getTableId();
+ String tableIdName = tableId.toTablePath().toString();
+ sinks.put(tableIdName, seaTunnelSink);
+ }
}
- handleSaveMode(sink);
+ SeaTunnelSink sink =
+ tryGenerateMultiTableSink(
+ sinks, ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
boolean sinkParallelism =
sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
boolean envParallelism =
envConfig.hasPath(CommonOptions.PARALLELISM.key());
int parallelism =
@@ -137,14 +157,12 @@ public class SinkExecuteProcessor
: envParallelism
?
envConfig.getInt(CommonOptions.PARALLELISM.key())
: 1;
- DataStreamSink<Row> dataStreamSink =
+ DataStreamSink<SeaTunnelRow> dataStreamSink =
stream.getDataStream()
.sinkTo(
SinkV1Adapter.wrap(
new FlinkSink<>(
- sink,
-
stream.getCatalogTables().get(0),
- parallelism)))
+ sink,
stream.getCatalogTables(), parallelism)))
.name(String.format("%s-Sink",
sink.getPluginName()));
if (sinkParallelism || envParallelism) {
dataStreamSink.setParallelism(parallelism);
@@ -154,6 +172,17 @@ public class SinkExecuteProcessor
return null;
}
+ // if not support multi table, rollback
+ public SeaTunnelSink tryGenerateMultiTableSink(
+ Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig,
ClassLoader classLoader) {
+ if (sinks.values().stream().anyMatch(sink -> !(sink instanceof
SupportMultiTableSink))) {
+ log.info("Unsupported multi table sink api, rollback to sink
template");
+ // choose the first sink
+ return sinks.values().iterator().next();
+ }
+ return FactoryUtil.createMultiTableSink(sinks, sinkConfig,
classLoader);
+ }
+
public boolean isFallback(Factory factory) {
try {
((TableSinkFactory) factory).createSink(null);
@@ -176,10 +205,10 @@ public class SinkExecuteProcessor
return source;
}
- public void handleSaveMode(SeaTunnelSink sink) {
- if (sink instanceof SupportSaveMode) {
- Optional<SaveModeHandler> saveModeHandler =
- ((SupportSaveMode) sink).getSaveModeHandler();
+ public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
+ if (seaTunnelSink instanceof SupportSaveMode) {
+ SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
+ Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
handler.open();
@@ -188,11 +217,6 @@ public class SinkExecuteProcessor
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
- } else if (sink instanceof MultiTableSink) {
- Map<String, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
- for (SeaTunnelSink seaTunnelSink : sinks.values()) {
- handleSaveMode(seaTunnelSink);
- }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index a98a5cd4d2..f5057a4fd0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -264,8 +264,8 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
@TestTemplate
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public void testMysqlCdcMultiTableE2e(TestContainer container) {
// Clear related content to ensure that multiple operations are not
affected
clearTable(MYSQL_DATABASE, SOURCE_TABLE_1);
@@ -320,7 +320,7 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
// Clear related content to ensure that multiple operations are not
affected
@@ -437,10 +437,12 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
@TestTemplate
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public void testMysqlCdcMultiTableWithCustomPrimaryKey(TestContainer
container) {
// Clear related content to ensure that multiple operations are not
affected
+ clearTable(MYSQL_DATABASE, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
+ clearTable(MYSQL_DATABASE, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
index dc80a083a7..5529c82396 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
@@ -201,8 +201,8 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
@TestTemplate
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public void testOpengaussCdcMultiTableE2e(TestContainer container) {
try {
CompletableFuture.supplyAsync(
@@ -285,7 +285,7 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
try {
@@ -394,7 +394,7 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
try {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 0192fae3f7..03cd2039b0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -61,7 +61,7 @@ import static org.junit.Assert.assertNotNull;
@Slf4j
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
+ type = {EngineType.SPARK},
disabledReason =
"Currently SPARK do not support cdc,Flink is prone to time
out, temporarily disable")
public class OracleCDCIT extends TestSuiteBase implements TestResource {
@@ -301,8 +301,8 @@ public class OracleCDCIT extends TestSuiteBase implements
TestResource {
@TestTemplate
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public void testOracleCdcMultiTableE2e(TestContainer container)
throws IOException, InterruptedException {
@@ -378,7 +378,7 @@ public class OracleCDCIT extends TestSuiteBase implements
TestResource {
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 0ea8f593ee..6aec6034d3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -186,8 +186,8 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
@TestTemplate
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public void testPostgresCdcMultiTableE2e(TestContainer container) {
try {
@@ -271,7 +271,7 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
try {
@@ -379,7 +379,7 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
try {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 1c54c7eb3e..7080f003e1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -35,9 +35,7 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.commons.io.IOUtils;
@@ -203,10 +201,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
}
@TestTemplate
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multiple table
read")
public void testElasticsSearchWithMultiSourceByFilter(TestContainer
container)
throws InterruptedException, IOException {
// read read_filter_index1,read_filter_index2
@@ -307,10 +301,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Assertions.assertEquals(0, sinkData2.size());
}
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multiple table
read")
@TestTemplate
public void testElasticsearchWithMultiSink(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
index 2822231874..790a9dc36a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
@@ -19,9 +19,7 @@ package org.apache.seatunnel.e2e.connector.email;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -84,10 +82,6 @@ public class EmailWithMultiIT extends TestSuiteBase
implements TestResource {
}
@TestTemplate
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multi-table")
public void testMultipleTableEmailSink(TestContainer container) throws
Exception {
Container.ExecResult textWriteResult =
container.executeJob("/fake_to_multiemailsink.conf");
testEMailSuccess(2, "[email protected]",
"[email protected]");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
index 77e635c2f7..4c63b7e335 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.e2e.connector.file.local;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.container.TestHelper;
@@ -33,8 +32,8 @@ import java.io.IOException;
@DisabledOnContainer(
value = {TestContainerId.SPARK_2_4},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multi table")
+ type = {},
+ disabledReason = "")
public class LocalFileWithMultipleTableIT extends TestSuiteBase {
/** Copy data files to container */
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index d174c13f53..5274c1a8c9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -295,10 +295,7 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(cf2Count, 5);
}
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK does not support multiple table
write")
+ @TestTemplate
public void testHbaseMultiTableSink(TestContainer container)
throws IOException, InterruptedException {
TableName multiTable1 = TableName.valueOf(MULTI_TABLE_ONE_NAME);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 9e9bed031d..c3d4cf936c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -23,9 +23,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -235,10 +233,6 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult19.getExitCode());
}
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multiple table
read")
@TestTemplate
public void testMultiTableHttp(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
index 74310f7868..5edeba5c60 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -25,9 +25,7 @@ import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
@@ -247,10 +245,6 @@ public class InfluxdbIT extends TestSuiteBase implements
TestResource {
}
@TestTemplate
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multiple table
read")
public void testInfluxdbMultipleWrite(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
index 494d273879..272199eceb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
@@ -22,9 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.commons.lang3.tuple.Pair;
@@ -169,10 +167,6 @@ public class JdbcMysqlMultipleTablesIT extends
TestSuiteBase implements TestReso
query(String.format("SELECT * FROM %s.%s", SINK_DATABASE,
"table1")));
}
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support
multiple tables")
@TestTemplate
public void testMysqlJdbcMultipleTableE2e(TestContainer container)
throws IOException, InterruptedException, SQLException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
index 2b0bd05d5a..60b0155d9a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
@@ -22,9 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.commons.lang3.tuple.Pair;
@@ -149,12 +147,8 @@ public class JdbcOracleMultipleTablesIT extends
TestSuiteBase implements TestRes
initSourceTablesData();
}
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multiple tables")
@TestTemplate
- public void testMysqlJdbcMultipleTableE2e(TestContainer container)
+ public void testOracleJdbcMultipleTableE2e(TestContainer container)
throws IOException, InterruptedException, SQLException {
clearSinkTables();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index e8e61d3931..b6f0a701d1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -385,10 +385,6 @@ public class KuduIT extends TestSuiteBase implements
TestResource {
kuduClient.deleteTable("kudu_cdc_sink_table");
}
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multiple table
read")
@TestTemplate
public void testKuduMultipleRead(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 66288bbb15..60bffba6f4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -27,9 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.junit.jupiter.api.AfterAll;
@@ -348,10 +346,6 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
}
@TestTemplate
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Currently FLINK do not support multiple table
read")
public void testMultipletableRedisSink(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
index c507128200..db8306bfa2 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
@@ -30,6 +30,8 @@ public enum TestContainerId {
FLINK_1_14(FLINK, "1.14.6"),
FLINK_1_15(FLINK, "1.15.3"),
FLINK_1_16(FLINK, "1.16.0"),
+ FLINK_1_17(FLINK, "1.17.2"),
+ FLINK_1_18(FLINK, "1.18.0"),
SPARK_2_4(SPARK, "2.4.6"),
SPARK_3_3(SPARK, "3.3.0"),
SEATUNNEL(EngineType.SEATUNNEL, "dev");
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink17Container.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink17Container.java
new file mode 100644
index 0000000000..387bed9bf8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink17Container.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.flink;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+import java.io.File;
+
+/**
+ * This class is the base class of FlinkEnvironment test for new seatunnel
connector API. The before
+ * method will create a Flink cluster, and after method will close the Flink
cluster. You can use
+ * {@link Flink17Container#executeJob} to submit a seatunnel config and run a
seatunnel job.
+ */
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink17Container extends AbstractTestFlinkContainer {
+
+ @Override
+ public TestContainerId identifier() {
+ return TestContainerId.FLINK_1_17;
+ }
+
+ @Override
+ protected String getDockerImage() {
+ return "tyrantlucifer/flink:1.17.2-scala_2.12_hadoop27";
+ }
+
+ @Override
+ protected String getStartModuleName() {
+ return "seatunnel-flink-starter" + File.separator +
"seatunnel-flink-15-starter";
+ }
+
+ @Override
+ protected String getStartShellName() {
+ return "start-seatunnel-flink-15-connector-v2.sh";
+ }
+
+ @Override
+ protected String getConnectorType() {
+ return "seatunnel";
+ }
+
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors-v2";
+ }
+
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink18Container.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink18Container.java
new file mode 100644
index 0000000000..ab3743cdfc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink18Container.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.flink;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+import java.io.File;
+
+/**
+ * This class is the base class of FlinkEnvironment test for new seatunnel
connector API. The before
+ * method will create a Flink cluster, and after method will close the Flink
cluster. You can use
+ * {@link Flink18Container#executeJob} to submit a seatunnel config and run a
seatunnel job.
+ */
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink18Container extends AbstractTestFlinkContainer {
+
+ @Override
+ public TestContainerId identifier() {
+ return TestContainerId.FLINK_1_18;
+ }
+
+ @Override
+ protected String getDockerImage() {
+ return "tyrantlucifer/flink:1.18.0-scala_2.12_hadoop27";
+ }
+
+ @Override
+ protected String getStartModuleName() {
+ return "seatunnel-flink-starter" + File.separator +
"seatunnel-flink-15-starter";
+ }
+
+ @Override
+ protected String getStartShellName() {
+ return "start-seatunnel-flink-15-connector-v2.sh";
+ }
+
+ @Override
+ protected String getConnectorType() {
+ return "seatunnel";
+ }
+
+ @Override
+ protected String getConnectorModulePath() {
+ return "seatunnel-connectors-v2";
+ }
+
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 66785d5b64..5ee0a1e7bd 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
import
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -225,7 +226,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
@TestTemplate
@DisabledOnContainer(
- value = {},
+ value = {TestContainerId.FLINK_1_17, TestContainerId.FLINK_1_18},
type = {EngineType.SEATUNNEL, EngineType.SPARK},
disabledReason =
"depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index cab4b6f225..022ef3224c 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -48,16 +48,16 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT,
GlobalCommT> sink;
- private final CatalogTable catalogTable;
+ private final List<CatalogTable> catalogTables;
private final int parallelism;
public FlinkSink(
SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
- CatalogTable catalogTable,
+ List<CatalogTable> catalogTables,
int parallelism) {
this.sink = sink;
- this.catalogTable = catalogTable;
+ this.catalogTables = catalogTables;
this.parallelism = parallelism;
}
@@ -68,15 +68,13 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
new FlinkSinkWriterContext(context, parallelism);
if (states == null || states.isEmpty()) {
- return new FlinkSinkWriter<>(
- sink.createWriter(stContext), 1,
catalogTable.getSeaTunnelRowType(), stContext);
+ return new FlinkSinkWriter<>(sink.createWriter(stContext), 1,
stContext);
} else {
List<WriterStateT> restoredState =
states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
return new FlinkSinkWriter<>(
sink.restoreWriter(stContext, restoredState),
states.get(0).getCheckpointId() + 1,
- catalogTable.getSeaTunnelRowType(),
stContext);
}
}
@@ -94,12 +92,30 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
@Override
public Optional<SimpleVersionedSerializer<CommitWrapper<CommT>>>
getCommittableSerializer() {
- return
sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
+ try {
+ if (sink.createCommitter().isPresent()
+ || sink.createAggregatedCommitter().isPresent()) {
+ return
sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
+ } else {
+ return Optional.empty();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create Committer or
AggregatedCommitter", e);
+ }
}
@Override
public Optional<SimpleVersionedSerializer<GlobalCommT>>
getGlobalCommittableSerializer() {
- return
sink.getAggregatedCommitInfoSerializer().map(FlinkSimpleVersionedSerializer::new);
+ try {
+ if (sink.createAggregatedCommitter().isPresent()) {
+ return sink.getAggregatedCommitInfoSerializer()
+ .map(FlinkSimpleVersionedSerializer::new);
+ } else {
+ return Optional.empty();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create AggregatedCommitter",
e);
+ }
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 8de831aee1..7a47052c01 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.flink.api.connector.sink.Sink;
@@ -69,7 +68,6 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
FlinkSinkWriter(
org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT,
WriterStateT> sinkWriter,
long checkpointId,
- SeaTunnelDataType<?> dataType,
org.apache.seatunnel.api.sink.SinkWriter.Context context) {
this.context = context;
this.sinkWriter = sinkWriter;