This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 65312ff73e [Feature][Flink] Support multiple tables read and write 
(#7713)
65312ff73e is described below

commit 65312ff73ef51e7087f935bfa1d486331027c9ec
Author: Sirui-Li <[email protected]>
AuthorDate: Wed Sep 25 15:02:44 2024 +0800

    [Feature][Flink] Support multiple tables read and write (#7713)
---
 .github/workflows/backend.yml                      |   8 +-
 docs/en/start-v2/locally/quick-start-flink.md      |   2 +-
 docs/zh/start-v2/locally/quick-start-flink.md      |   2 +-
 .../core/starter/execution/PluginUtil.java         |   6 --
 .../flink/execution/SinkExecuteProcessor.java      | 106 +++++++++++++--------
 .../flink/execution/SinkExecuteProcessor.java      | 106 +++++++++++++--------
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java |  12 ++-
 .../seatunnel/cdc/postgres/OpengaussCDCIT.java     |   8 +-
 .../seatunnel/cdc/oracle/OracleCDCIT.java          |   8 +-
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      |   8 +-
 .../connector/elasticsearch/ElasticsearchIT.java   |  10 --
 .../e2e/connector/email/EmailWithMultiIT.java      |   6 --
 .../file/local/LocalFileWithMultipleTableIT.java   |   5 +-
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     |   5 +-
 .../seatunnel/e2e/connector/http/HttpIT.java       |   6 --
 .../e2e/connector/influxdb/InfluxdbIT.java         |   6 --
 .../seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java  |   6 --
 .../seatunnel/jdbc/JdbcOracleMultipleTablesIT.java |   8 +-
 .../seatunnel/e2e/connector/kudu/KuduIT.java       |   4 -
 .../connector/redis/RedisTestCaseTemplateIT.java   |   6 --
 .../e2e/common/container/TestContainerId.java      |   2 +
 .../common/container/flink/Flink17Container.java   |  71 ++++++++++++++
 .../common/container/flink/Flink18Container.java   |  71 ++++++++++++++
 .../seatunnel/engine/e2e/CheckpointEnableIT.java   |   3 +-
 .../translation/flink/sink/FlinkSink.java          |  32 +++++--
 .../translation/flink/sink/FlinkSinkWriter.java    |   2 -
 26 files changed, 329 insertions(+), 180 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 1bc424b297..ad4390a048 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -544,7 +544,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 45
+    timeout-minutes: 90
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
@@ -654,7 +654,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 90
+    timeout-minutes: 150
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
@@ -683,7 +683,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 90
+    timeout-minutes: 150
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
@@ -828,7 +828,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 90
+    timeout-minutes: 150
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/start-v2/locally/quick-start-flink.md 
b/docs/en/start-v2/locally/quick-start-flink.md
index 1ccc5a3001..244dfd8c9e 100644
--- a/docs/en/start-v2/locally/quick-start-flink.md
+++ b/docs/en/start-v2/locally/quick-start-flink.md
@@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
 ./bin/start-seatunnel-flink-13-connector-v2.sh --config 
./config/v2.streaming.conf.template
 ```
 
-Flink version between `1.15.x` and `1.16.x`
+Flink version between `1.15.x` and `1.18.x`
 
 ```shell
 cd "apache-seatunnel-${version}"
diff --git a/docs/zh/start-v2/locally/quick-start-flink.md 
b/docs/zh/start-v2/locally/quick-start-flink.md
index 87de2808c4..d6a5a794bd 100644
--- a/docs/zh/start-v2/locally/quick-start-flink.md
+++ b/docs/zh/start-v2/locally/quick-start-flink.md
@@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
 ./bin/start-seatunnel-flink-13-connector-v2.sh --config 
./config/v2.streaming.conf.template
 ```
 
-Flink版本`1.15.x`到`1.16.x`
+Flink版本`1.15.x`到`1.18.x`
 
 ```shell
 cd "apache-seatunnel-${version}"
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index e8a682062e..4630a4fbf4 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -102,12 +102,6 @@ public class PluginUtil {
             catalogTables =
                     
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
         }
-
-        //  if (catalogTables.size() != 1) {
-        //      throw new SeaTunnelException(
-        //              String.format("Unsupported table number: %d on flink",
-        // catalogTables.size()));
-        //  }
         return new SourceTableInfo(source, catalogTables);
     }
 
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c5eabc6303..3b0a7db8e8 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -26,11 +26,15 @@ import 
org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
-import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -42,9 +46,11 @@ import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscov
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
 
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.types.Row;
+
+import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -54,6 +60,7 @@ import static 
org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
+@Slf4j
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
 
@@ -99,35 +106,48 @@ public class SinkExecuteProcessor
                     fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
             Optional<? extends Factory> factory = plugins.get(i);
             boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
-            SeaTunnelSink sink;
+            Map<String, SeaTunnelSink> sinks = new HashMap<>();
             if (fallBack) {
-                sink =
-                        fallbackCreateSink(
-                                sinkPluginDiscovery,
-                                PluginIdentifier.of(
-                                        ENGINE_TYPE,
-                                        PLUGIN_TYPE,
-                                        
sinkConfig.getString(PLUGIN_NAME.key())),
-                                sinkConfig);
-                sink.setJobContext(jobContext);
-                // TODO support sink multi sink
-                SeaTunnelRowType sourceType =
-                        stream.getCatalogTables().get(0).getSeaTunnelRowType();
-                sink.setTypeInfo(sourceType);
+                for (CatalogTable catalogTable : stream.getCatalogTables()) {
+                    SeaTunnelSink fallBackSink =
+                            fallbackCreateSink(
+                                    sinkPluginDiscovery,
+                                    PluginIdentifier.of(
+                                            ENGINE_TYPE,
+                                            PLUGIN_TYPE,
+                                            
sinkConfig.getString(PLUGIN_NAME.key())),
+                                    sinkConfig);
+                    fallBackSink.setJobContext(jobContext);
+                    SeaTunnelRowType sourceType = 
catalogTable.getSeaTunnelRowType();
+                    fallBackSink.setTypeInfo(sourceType);
+                    handleSaveMode(fallBackSink);
+                    TableIdentifier tableId = catalogTable.getTableId();
+                    String tableIdName = tableId.toTablePath().toString();
+                    sinks.put(tableIdName, fallBackSink);
+                }
             } else {
-                // TODO support sink multi sink
-                TableSinkFactoryContext context =
-                        TableSinkFactoryContext.replacePlaceholderAndCreate(
-                                stream.getCatalogTables().get(0),
-                                ReadonlyConfig.fromConfig(sinkConfig),
-                                classLoader,
-                                ((TableSinkFactory) factory.get())
-                                        .excludeTablePlaceholderReplaceKeys());
-                
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
-                sink = ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
-                sink.setJobContext(jobContext);
+                for (CatalogTable catalogTable : stream.getCatalogTables()) {
+                    SeaTunnelSink seaTunnelSink;
+                    TableSinkFactoryContext context =
+                            
TableSinkFactoryContext.replacePlaceholderAndCreate(
+                                    catalogTable,
+                                    ReadonlyConfig.fromConfig(sinkConfig),
+                                    classLoader,
+                                    ((TableSinkFactory) factory.get())
+                                            
.excludeTablePlaceholderReplaceKeys());
+                    
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+                    seaTunnelSink =
+                            ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
+                    seaTunnelSink.setJobContext(jobContext);
+                    handleSaveMode(seaTunnelSink);
+                    TableIdentifier tableId = catalogTable.getTableId();
+                    String tableIdName = tableId.toTablePath().toString();
+                    sinks.put(tableIdName, seaTunnelSink);
+                }
             }
-            handleSaveMode(sink);
+            SeaTunnelSink sink =
+                    tryGenerateMultiTableSink(
+                            sinks, ReadonlyConfig.fromConfig(sinkConfig), 
classLoader);
             boolean sinkParallelism = 
sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
             boolean envParallelism = 
envConfig.hasPath(CommonOptions.PARALLELISM.key());
             int parallelism =
@@ -136,11 +156,9 @@ public class SinkExecuteProcessor
                             : envParallelism
                                     ? 
envConfig.getInt(CommonOptions.PARALLELISM.key())
                                     : 1;
-            DataStreamSink<Row> dataStreamSink =
+            DataStreamSink<SeaTunnelRow> dataStreamSink =
                     stream.getDataStream()
-                            .sinkTo(
-                                    new FlinkSink<>(
-                                            sink, 
stream.getCatalogTables().get(0), parallelism))
+                            .sinkTo(new FlinkSink<>(sink, 
stream.getCatalogTables(), parallelism))
                             .name(String.format("%s-Sink", 
sink.getPluginName()));
             dataStreamSink.setParallelism(parallelism);
         }
@@ -148,6 +166,17 @@ public class SinkExecuteProcessor
         return null;
     }
 
+    // if not support multi table, rollback
+    public SeaTunnelSink tryGenerateMultiTableSink(
+            Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, 
ClassLoader classLoader) {
+        if (sinks.values().stream().anyMatch(sink -> !(sink instanceof 
SupportMultiTableSink))) {
+            log.info("Unsupported multi table sink api, rollback to sink 
template");
+            // choose the first sink
+            return sinks.values().iterator().next();
+        }
+        return FactoryUtil.createMultiTableSink(sinks, sinkConfig, 
classLoader);
+    }
+
     public boolean isFallback(Factory factory) {
         try {
             ((TableSinkFactory) factory).createSink(null);
@@ -170,10 +199,10 @@ public class SinkExecuteProcessor
         return source;
     }
 
-    public void handleSaveMode(SeaTunnelSink sink) {
-        if (sink instanceof SupportSaveMode) {
-            Optional<SaveModeHandler> saveModeHandler =
-                    ((SupportSaveMode) sink).getSaveModeHandler();
+    public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
+        if (seaTunnelSink instanceof SupportSaveMode) {
+            SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
+            Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
             if (saveModeHandler.isPresent()) {
                 try (SaveModeHandler handler = saveModeHandler.get()) {
                     handler.open();
@@ -182,11 +211,6 @@ public class SinkExecuteProcessor
                     throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                 }
             }
-        } else if (sink instanceof MultiTableSink) {
-            Map<String, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
-            for (SeaTunnelSink seaTunnelSink : sinks.values()) {
-                handleSaveMode(seaTunnelSink);
-            }
         }
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 5e6bd9173c..cc16723929 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -26,11 +26,15 @@ import 
org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
-import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -43,9 +47,11 @@ import org.apache.seatunnel.translation.flink.sink.FlinkSink;
 
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
-import org.apache.flink.types.Row;
+
+import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -54,6 +60,7 @@ import java.util.stream.Collectors;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
+@Slf4j
 @SuppressWarnings("unchecked,rawtypes")
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
@@ -100,35 +107,48 @@ public class SinkExecuteProcessor
                     fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
             Optional<? extends Factory> factory = plugins.get(i);
             boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
-            SeaTunnelSink sink;
+            Map<String, SeaTunnelSink> sinks = new HashMap<>();
             if (fallBack) {
-                sink =
-                        fallbackCreateSink(
-                                sinkPluginDiscovery,
-                                PluginIdentifier.of(
-                                        ENGINE_TYPE,
-                                        PLUGIN_TYPE,
-                                        
sinkConfig.getString(PLUGIN_NAME.key())),
-                                sinkConfig);
-                sink.setJobContext(jobContext);
-                // TODO sink support multi table
-                SeaTunnelRowType sourceType =
-                        stream.getCatalogTables().get(0).getSeaTunnelRowType();
-                sink.setTypeInfo(sourceType);
+                for (CatalogTable catalogTable : stream.getCatalogTables()) {
+                    SeaTunnelSink fallBackSink =
+                            fallbackCreateSink(
+                                    sinkPluginDiscovery,
+                                    PluginIdentifier.of(
+                                            ENGINE_TYPE,
+                                            PLUGIN_TYPE,
+                                            
sinkConfig.getString(PLUGIN_NAME.key())),
+                                    sinkConfig);
+                    fallBackSink.setJobContext(jobContext);
+                    SeaTunnelRowType sourceType = 
catalogTable.getSeaTunnelRowType();
+                    fallBackSink.setTypeInfo(sourceType);
+                    handleSaveMode(fallBackSink);
+                    TableIdentifier tableId = catalogTable.getTableId();
+                    String tableIdName = tableId.toTablePath().toString();
+                    sinks.put(tableIdName, fallBackSink);
+                }
             } else {
-                // TODO sink support multi table
-                TableSinkFactoryContext context =
-                        TableSinkFactoryContext.replacePlaceholderAndCreate(
-                                stream.getCatalogTables().get(0),
-                                ReadonlyConfig.fromConfig(sinkConfig),
-                                classLoader,
-                                ((TableSinkFactory) factory.get())
-                                        .excludeTablePlaceholderReplaceKeys());
-                
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
-                sink = ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
-                sink.setJobContext(jobContext);
+                for (CatalogTable catalogTable : stream.getCatalogTables()) {
+                    SeaTunnelSink seaTunnelSink;
+                    TableSinkFactoryContext context =
+                            
TableSinkFactoryContext.replacePlaceholderAndCreate(
+                                    catalogTable,
+                                    ReadonlyConfig.fromConfig(sinkConfig),
+                                    classLoader,
+                                    ((TableSinkFactory) factory.get())
+                                            
.excludeTablePlaceholderReplaceKeys());
+                    
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+                    seaTunnelSink =
+                            ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
+                    seaTunnelSink.setJobContext(jobContext);
+                    handleSaveMode(seaTunnelSink);
+                    TableIdentifier tableId = catalogTable.getTableId();
+                    String tableIdName = tableId.toTablePath().toString();
+                    sinks.put(tableIdName, seaTunnelSink);
+                }
             }
-            handleSaveMode(sink);
+            SeaTunnelSink sink =
+                    tryGenerateMultiTableSink(
+                            sinks, ReadonlyConfig.fromConfig(sinkConfig), 
classLoader);
             boolean sinkParallelism = 
sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
             boolean envParallelism = 
envConfig.hasPath(CommonOptions.PARALLELISM.key());
             int parallelism =
@@ -137,14 +157,12 @@ public class SinkExecuteProcessor
                             : envParallelism
                                     ? 
envConfig.getInt(CommonOptions.PARALLELISM.key())
                                     : 1;
-            DataStreamSink<Row> dataStreamSink =
+            DataStreamSink<SeaTunnelRow> dataStreamSink =
                     stream.getDataStream()
                             .sinkTo(
                                     SinkV1Adapter.wrap(
                                             new FlinkSink<>(
-                                                    sink,
-                                                    
stream.getCatalogTables().get(0),
-                                                    parallelism)))
+                                                    sink, 
stream.getCatalogTables(), parallelism)))
                             .name(String.format("%s-Sink", 
sink.getPluginName()));
             if (sinkParallelism || envParallelism) {
                 dataStreamSink.setParallelism(parallelism);
@@ -154,6 +172,17 @@ public class SinkExecuteProcessor
         return null;
     }
 
+    // if not support multi table, rollback
+    public SeaTunnelSink tryGenerateMultiTableSink(
+            Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, 
ClassLoader classLoader) {
+        if (sinks.values().stream().anyMatch(sink -> !(sink instanceof 
SupportMultiTableSink))) {
+            log.info("Unsupported multi table sink api, rollback to sink 
template");
+            // choose the first sink
+            return sinks.values().iterator().next();
+        }
+        return FactoryUtil.createMultiTableSink(sinks, sinkConfig, 
classLoader);
+    }
+
     public boolean isFallback(Factory factory) {
         try {
             ((TableSinkFactory) factory).createSink(null);
@@ -176,10 +205,10 @@ public class SinkExecuteProcessor
         return source;
     }
 
-    public void handleSaveMode(SeaTunnelSink sink) {
-        if (sink instanceof SupportSaveMode) {
-            Optional<SaveModeHandler> saveModeHandler =
-                    ((SupportSaveMode) sink).getSaveModeHandler();
+    public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
+        if (seaTunnelSink instanceof SupportSaveMode) {
+            SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
+            Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
             if (saveModeHandler.isPresent()) {
                 try (SaveModeHandler handler = saveModeHandler.get()) {
                     handler.open();
@@ -188,11 +217,6 @@ public class SinkExecuteProcessor
                     throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                 }
             }
-        } else if (sink instanceof MultiTableSink) {
-            Map<String, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
-            for (SeaTunnelSink seaTunnelSink : sinks.values()) {
-                handleSaveMode(seaTunnelSink);
-            }
         }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index a98a5cd4d2..f5057a4fd0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -264,8 +264,8 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     @DisabledOnContainer(
             value = {},
-            type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
     public void testMysqlCdcMultiTableE2e(TestContainer container) {
         // Clear related content to ensure that multiple operations are not 
affected
         clearTable(MYSQL_DATABASE, SOURCE_TABLE_1);
@@ -320,7 +320,7 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testMultiTableWithRestore(TestContainer container)
             throws IOException, InterruptedException {
         // Clear related content to ensure that multiple operations are not 
affected
@@ -437,10 +437,12 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     @DisabledOnContainer(
             value = {},
-            type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
     public void testMysqlCdcMultiTableWithCustomPrimaryKey(TestContainer 
container) {
         // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
+        clearTable(MYSQL_DATABASE, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
         clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
         clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
index dc80a083a7..5529c82396 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
@@ -201,8 +201,8 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
     @TestTemplate
     @DisabledOnContainer(
             value = {},
-            type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
     public void testOpengaussCdcMultiTableE2e(TestContainer container) {
         try {
             CompletableFuture.supplyAsync(
@@ -285,7 +285,7 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testMultiTableWithRestore(TestContainer container)
             throws IOException, InterruptedException {
         try {
@@ -394,7 +394,7 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testAddFiledWithRestore(TestContainer container)
             throws IOException, InterruptedException {
         try {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 0192fae3f7..03cd2039b0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -61,7 +61,7 @@ import static org.junit.Assert.assertNotNull;
 @Slf4j
 @DisabledOnContainer(
         value = {},
-        type = {EngineType.SPARK, EngineType.FLINK},
+        type = {EngineType.SPARK},
         disabledReason =
                 "Currently SPARK do not support cdc,Flink is prone to time 
out, temporarily disable")
 public class OracleCDCIT extends TestSuiteBase implements TestResource {
@@ -301,8 +301,8 @@ public class OracleCDCIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     @DisabledOnContainer(
             value = {},
-            type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
     public void testOracleCdcMultiTableE2e(TestContainer container)
             throws IOException, InterruptedException {
 
@@ -378,7 +378,7 @@ public class OracleCDCIT extends TestSuiteBase implements 
TestResource {
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testMultiTableWithRestore(TestContainer container)
             throws IOException, InterruptedException {
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 0ea8f593ee..6aec6034d3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -186,8 +186,8 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     @DisabledOnContainer(
             value = {},
-            type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
     public void testPostgresCdcMultiTableE2e(TestContainer container) {
 
         try {
@@ -271,7 +271,7 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testMultiTableWithRestore(TestContainer container)
             throws IOException, InterruptedException {
         try {
@@ -379,7 +379,7 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+            disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testAddFiledWithRestore(TestContainer container)
             throws IOException, InterruptedException {
         try {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 1c54c7eb3e..7080f003e1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -35,9 +35,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.apache.commons.io.IOUtils;
@@ -203,10 +201,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
     }
 
     @TestTemplate
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multiple table 
read")
     public void testElasticsSearchWithMultiSourceByFilter(TestContainer 
container)
             throws InterruptedException, IOException {
         // read read_filter_index1,read_filter_index2
@@ -307,10 +301,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertEquals(0, sinkData2.size());
     }
 
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multiple table 
read")
     @TestTemplate
     public void testElasticsearchWithMultiSink(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
index 2822231874..790a9dc36a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
@@ -19,9 +19,7 @@ package org.apache.seatunnel.e2e.connector.email;
 
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -84,10 +82,6 @@ public class EmailWithMultiIT extends TestSuiteBase 
implements TestResource {
     }
 
     @TestTemplate
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multi-table")
     public void testMultipleTableEmailSink(TestContainer container) throws 
Exception {
         Container.ExecResult textWriteResult = 
container.executeJob("/fake_to_multiemailsink.conf");
         testEMailSuccess(2, "[email protected]", 
"[email protected]");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
index 77e635c2f7..4c63b7e335 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.e2e.connector.file.local;
 
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.container.TestHelper;
@@ -33,8 +32,8 @@ import java.io.IOException;
 
 @DisabledOnContainer(
         value = {TestContainerId.SPARK_2_4},
-        type = {EngineType.FLINK},
-        disabledReason = "Currently FLINK do not support multi table")
+        type = {},
+        disabledReason = "")
 public class LocalFileWithMultipleTableIT extends TestSuiteBase {
 
     /** Copy data files to container */
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index d174c13f53..5274c1a8c9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -295,10 +295,7 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(cf2Count, 5);
     }
 
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK does not support multiple table 
write")
+    @TestTemplate
     public void testHbaseMultiTableSink(TestContainer container)
             throws IOException, InterruptedException {
         TableName multiTable1 = TableName.valueOf(MULTI_TABLE_ONE_NAME);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 9e9bed031d..c3d4cf936c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -23,9 +23,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -235,10 +233,6 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult19.getExitCode());
     }
 
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multiple table 
read")
     @TestTemplate
     public void testMultiTableHttp(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
index 74310f7868..5edeba5c60 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -25,9 +25,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.influxdb.InfluxDB;
 import org.influxdb.dto.BatchPoints;
@@ -247,10 +245,6 @@ public class InfluxdbIT extends TestSuiteBase implements 
TestResource {
     }
 
     @TestTemplate
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multiple table 
read")
     public void testInfluxdbMultipleWrite(TestContainer container)
             throws IOException, InterruptedException {
         Container.ExecResult execResult =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
index 494d273879..272199eceb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
@@ -22,9 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -169,10 +167,6 @@ public class JdbcMysqlMultipleTablesIT extends 
TestSuiteBase implements TestReso
                 query(String.format("SELECT * FROM %s.%s", SINK_DATABASE, 
"table1")));
     }
 
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently SPARK and FLINK do not support 
multiple tables")
     @TestTemplate
     public void testMysqlJdbcMultipleTableE2e(TestContainer container)
             throws IOException, InterruptedException, SQLException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
index 2b0bd05d5a..60b0155d9a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
@@ -22,9 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -149,12 +147,8 @@ public class JdbcOracleMultipleTablesIT extends 
TestSuiteBase implements TestRes
         initSourceTablesData();
     }
 
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multiple tables")
     @TestTemplate
-    public void testMysqlJdbcMultipleTableE2e(TestContainer container)
+    public void testOracleJdbcMultipleTableE2e(TestContainer container)
             throws IOException, InterruptedException, SQLException {
         clearSinkTables();
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index e8e61d3931..b6f0a701d1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -385,10 +385,6 @@ public class KuduIT extends TestSuiteBase implements 
TestResource {
         kuduClient.deleteTable("kudu_cdc_sink_table");
     }
 
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multiple table 
read")
     @TestTemplate
     public void testKuduMultipleRead(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 66288bbb15..60bffba6f4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -27,9 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
 import org.junit.jupiter.api.AfterAll;
@@ -348,10 +346,6 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
     }
 
     @TestTemplate
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Currently FLINK do not support multiple table 
read")
     public void testMultipletableRedisSink(TestContainer container)
             throws IOException, InterruptedException {
         Container.ExecResult execResult =
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
index c507128200..db8306bfa2 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
@@ -30,6 +30,8 @@ public enum TestContainerId {
     FLINK_1_14(FLINK, "1.14.6"),
     FLINK_1_15(FLINK, "1.15.3"),
     FLINK_1_16(FLINK, "1.16.0"),
+    FLINK_1_17(FLINK, "1.17.2"),
+    FLINK_1_18(FLINK, "1.18.0"),
     SPARK_2_4(SPARK, "2.4.6"),
     SPARK_3_3(SPARK, "3.3.0"),
     SEATUNNEL(EngineType.SEATUNNEL, "dev");
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink17Container.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink17Container.java
new file mode 100644
index 0000000000..387bed9bf8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink17Container.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.flink;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+import java.io.File;
+
+/**
+ * This class is the base class of FlinkEnvironment test for new seatunnel 
connector API. The before
+ * method will create a Flink cluster, and after method will close the Flink 
cluster. You can use
+ * {@link Flink17Container#executeJob} to submit a seatunnel config and run a 
seatunnel job.
+ */
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink17Container extends AbstractTestFlinkContainer {
+
+    @Override
+    public TestContainerId identifier() {
+        return TestContainerId.FLINK_1_17;
+    }
+
+    @Override
+    protected String getDockerImage() {
+        return "tyrantlucifer/flink:1.17.2-scala_2.12_hadoop27";
+    }
+
+    @Override
+    protected String getStartModuleName() {
+        return "seatunnel-flink-starter" + File.separator + 
"seatunnel-flink-15-starter";
+    }
+
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-flink-15-connector-v2.sh";
+    }
+
+    @Override
+    protected String getConnectorType() {
+        return "seatunnel";
+    }
+
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors-v2";
+    }
+
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "connector-";
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink18Container.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink18Container.java
new file mode 100644
index 0000000000..ab3743cdfc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink18Container.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.flink;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+import java.io.File;
+
+/**
+ * This class is the base class of FlinkEnvironment test for new seatunnel 
connector API. The before
+ * method will create a Flink cluster, and after method will close the Flink 
cluster. You can use
+ * {@link Flink18Container#executeJob} to submit a seatunnel config and run a 
seatunnel job.
+ */
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink18Container extends AbstractTestFlinkContainer {
+
+    @Override
+    public TestContainerId identifier() {
+        return TestContainerId.FLINK_1_18;
+    }
+
+    @Override
+    protected String getDockerImage() {
+        return "tyrantlucifer/flink:1.18.0-scala_2.12_hadoop27";
+    }
+
+    @Override
+    protected String getStartModuleName() {
+        return "seatunnel-flink-starter" + File.separator + 
"seatunnel-flink-15-starter";
+    }
+
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-flink-15-connector-v2.sh";
+    }
+
+    @Override
+    protected String getConnectorType() {
+        return "seatunnel";
+    }
+
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors-v2";
+    }
+
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "connector-";
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 66785d5b64..5ee0a1e7bd 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import 
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
@@ -225,7 +226,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
 
     @TestTemplate
     @DisabledOnContainer(
-            value = {},
+            value = {TestContainerId.FLINK_1_17, TestContainerId.FLINK_1_18},
             type = {EngineType.SEATUNNEL, EngineType.SPARK},
             disabledReason =
                     "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index cab4b6f225..022ef3224c 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -48,16 +48,16 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT>
 
     private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, 
GlobalCommT> sink;
 
-    private final CatalogTable catalogTable;
+    private final List<CatalogTable> catalogTables;
 
     private final int parallelism;
 
     public FlinkSink(
             SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
-            CatalogTable catalogTable,
+            List<CatalogTable> catalogTables,
             int parallelism) {
         this.sink = sink;
-        this.catalogTable = catalogTable;
+        this.catalogTables = catalogTables;
         this.parallelism = parallelism;
     }
 
@@ -68,15 +68,13 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT>
         org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
                 new FlinkSinkWriterContext(context, parallelism);
         if (states == null || states.isEmpty()) {
-            return new FlinkSinkWriter<>(
-                    sink.createWriter(stContext), 1, 
catalogTable.getSeaTunnelRowType(), stContext);
+            return new FlinkSinkWriter<>(sink.createWriter(stContext), 1, 
stContext);
         } else {
             List<WriterStateT> restoredState =
                     
states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
             return new FlinkSinkWriter<>(
                     sink.restoreWriter(stContext, restoredState),
                     states.get(0).getCheckpointId() + 1,
-                    catalogTable.getSeaTunnelRowType(),
                     stContext);
         }
     }
@@ -94,12 +92,30 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT>
 
     @Override
     public Optional<SimpleVersionedSerializer<CommitWrapper<CommT>>> 
getCommittableSerializer() {
-        return 
sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
+        try {
+            if (sink.createCommitter().isPresent()
+                    || sink.createAggregatedCommitter().isPresent()) {
+                return 
sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
+            } else {
+                return Optional.empty();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create Committer or 
AggregatedCommitter", e);
+        }
     }
 
     @Override
     public Optional<SimpleVersionedSerializer<GlobalCommT>> 
getGlobalCommittableSerializer() {
-        return 
sink.getAggregatedCommitInfoSerializer().map(FlinkSimpleVersionedSerializer::new);
+        try {
+            if (sink.createAggregatedCommitter().isPresent()) {
+                return sink.getAggregatedCommitInfoSerializer()
+                        .map(FlinkSimpleVersionedSerializer::new);
+            } else {
+                return Optional.empty();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create AggregatedCommitter", 
e);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 8de831aee1..7a47052c01 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
 import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.flink.api.connector.sink.Sink;
@@ -69,7 +68,6 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
     FlinkSinkWriter(
             org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, 
WriterStateT> sinkWriter,
             long checkpointId,
-            SeaTunnelDataType<?> dataType,
             org.apache.seatunnel.api.sink.SinkWriter.Context context) {
         this.context = context;
         this.sinkWriter = sinkWriter;

Reply via email to