This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3d4f4bb33a [Fix] [Kafka Source] kafka source use topic as table name 
instead of fullName (#8401)
3d4f4bb33a is described below

commit 3d4f4bb33a61e6c7f7bc8bb0b9d9d002d86d6ef9
Author: Cheun99 <[email protected]>
AuthorDate: Wed Jan 15 10:44:35 2025 +0800

    [Fix] [Kafka Source] kafka source use topic as table name instead of 
fullName (#8401)
---
 .../api/sink/multitablesink/MultiTableSink.java    | 30 +++++----
 .../seatunnel/api/table/factory/FactoryUtil.java   |  3 +-
 .../table/factory/MultiTableFactoryContext.java    |  5 +-
 .../seatunnel/kafka/source/KafkaSourceConfig.java  |  4 +-
 .../core/starter/execution/PluginUtil.java         |  5 +-
 .../flink/execution/SinkExecuteProcessor.java      | 13 ++--
 .../flink/execution/SinkExecuteProcessor.java      | 13 ++--
 .../spark/execution/SinkExecuteProcessor.java      |  3 +-
 .../spark/execution/SinkExecuteProcessor.java      |  3 +-
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 22 +++++-
 ...ource_topic_multiple_point_text_to_console.conf | 78 ++++++++++++++++++++++
 .../engine/core/dag/actions/SinkConfig.java        |  4 +-
 .../core/parse/MultipleTableJobConfigParser.java   | 12 ++--
 .../server/dag/physical/PhysicalPlanGenerator.java |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  3 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  6 +-
 16 files changed, 158 insertions(+), 48 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 23f4fc455b..c9d810e1fe 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -53,7 +53,7 @@ public class MultiTableSink
                         MultiTableAggregatedCommitInfo>,
                 SupportSchemaEvolutionSink {
 
-    @Getter private final Map<String, SeaTunnelSink> sinks;
+    @Getter private final Map<TablePath, SeaTunnelSink> sinks;
     private final int replicaNum;
 
     public MultiTableSink(MultiTableFactoryContext context) {
@@ -72,9 +72,10 @@ public class MultiTableSink
         Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new 
HashMap<>();
         Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new 
HashMap<>();
         for (int i = 0; i < replicaNum; i++) {
-            for (String tableIdentifier : sinks.keySet()) {
-                SeaTunnelSink sink = sinks.get(tableIdentifier);
+            for (TablePath tablePath : sinks.keySet()) {
+                SeaTunnelSink sink = sinks.get(tablePath);
                 int index = context.getIndexOfSubtask() * replicaNum + i;
+                String tableIdentifier = tablePath.toString();
                 writers.put(
                         SinkIdentifier.of(tableIdentifier, index),
                         sink.createWriter(new SinkContextProxy(index, 
replicaNum, context)));
@@ -91,10 +92,10 @@ public class MultiTableSink
         Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new 
HashMap<>();
 
         for (int i = 0; i < replicaNum; i++) {
-            for (String tableIdentifier : sinks.keySet()) {
-                SeaTunnelSink sink = sinks.get(tableIdentifier);
+            for (TablePath tablePath : sinks.keySet()) {
+                SeaTunnelSink sink = sinks.get(tablePath);
                 int index = context.getIndexOfSubtask() * replicaNum + i;
-                SinkIdentifier sinkIdentifier = 
SinkIdentifier.of(tableIdentifier, index);
+                SinkIdentifier sinkIdentifier = 
SinkIdentifier.of(tablePath.toString(), index);
                 List<?> state =
                         states.stream()
                                 .map(
@@ -113,7 +114,7 @@ public class MultiTableSink
                             sink.restoreWriter(
                                     new SinkContextProxy(index, replicaNum, 
context), state));
                 }
-                sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, 
index), context);
+                sinkWritersContext.put(sinkIdentifier, context);
             }
         }
         return new MultiTableSinkWriter(writers, replicaNum, 
sinkWritersContext);
@@ -127,12 +128,13 @@ public class MultiTableSink
     @Override
     public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter() 
throws IOException {
         Map<String, SinkCommitter<?>> committers = new HashMap<>();
-        for (String tableIdentifier : sinks.keySet()) {
-            SeaTunnelSink sink = sinks.get(tableIdentifier);
+        for (TablePath tablePath : sinks.keySet()) {
+            SeaTunnelSink sink = sinks.get(tablePath);
             sink.createCommitter()
                     .ifPresent(
                             committer ->
-                                    committers.put(tableIdentifier, 
(SinkCommitter<?>) committer));
+                                    committers.put(
+                                            tablePath.toString(), 
(SinkCommitter<?>) committer));
         }
         if (committers.isEmpty()) {
             return Optional.empty();
@@ -149,12 +151,12 @@ public class MultiTableSink
     public Optional<SinkAggregatedCommitter<MultiTableCommitInfo, 
MultiTableAggregatedCommitInfo>>
             createAggregatedCommitter() throws IOException {
         Map<String, SinkAggregatedCommitter<?, ?>> aggCommitters = new 
HashMap<>();
-        for (String tableIdentifier : sinks.keySet()) {
-            SeaTunnelSink sink = sinks.get(tableIdentifier);
+        for (TablePath tablePath : sinks.keySet()) {
+            SeaTunnelSink sink = sinks.get(tablePath);
             Optional<SinkAggregatedCommitter<?, ?>> sinkOptional = 
sink.createAggregatedCommitter();
             sinkOptional.ifPresent(
                     sinkAggregatedCommitter ->
-                            aggCommitters.put(tableIdentifier, 
sinkAggregatedCommitter));
+                            aggCommitters.put(tablePath.toString(), 
sinkAggregatedCommitter));
         }
         if (aggCommitters.isEmpty()) {
             return Optional.empty();
@@ -171,7 +173,7 @@ public class MultiTableSink
                 tablePaths.add(
                         ((CatalogTable) 
values.get(i).getWriteCatalogTable().get()).getTablePath());
             } else {
-                tablePaths.add(TablePath.of(sinks.keySet().toArray(new 
String[0])[i]));
+                tablePaths.add(sinks.keySet().toArray(new TablePath[0])[i]);
             }
         }
         return tablePaths;
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index c94b88be7c..e11afd1d19 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -178,7 +179,7 @@ public final class FactoryUtil {
 
     public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
             SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createMultiTableSink(
-                    Map<String, SeaTunnelSink> sinks,
+                    Map<TablePath, SeaTunnelSink> sinks,
                     ReadonlyConfig options,
                     ClassLoader classLoader) {
         try {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
index 809b747eba..6caeeae8ed 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 
 import lombok.Getter;
 
@@ -27,10 +28,10 @@ import java.util.Map;
 @Getter
 public class MultiTableFactoryContext extends TableSinkFactoryContext {
 
-    private final Map<String, SeaTunnelSink> sinks;
+    private final Map<TablePath, SeaTunnelSink> sinks;
 
     public MultiTableFactoryContext(
-            ReadonlyConfig options, ClassLoader classLoader, Map<String, 
SeaTunnelSink> sinks) {
+            ReadonlyConfig options, ClassLoader classLoader, Map<TablePath, 
SeaTunnelSink> sinks) {
         super(null, options, classLoader);
         this.sinks = sinks;
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 44ca54f0f2..db591cfdf0 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -137,7 +137,7 @@ public class KafkaSourceConfig implements Serializable {
         return consumerMetadataList.stream()
                 .collect(
                         Collectors.toMap(
-                                consumerMetadata -> 
TablePath.of(consumerMetadata.getTopic()),
+                                consumerMetadata -> TablePath.of(null, 
consumerMetadata.getTopic()),
                                 consumerMetadata -> consumerMetadata));
     }
 
@@ -208,7 +208,7 @@ public class KafkaSourceConfig implements Serializable {
     private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
         Optional<Map<String, Object>> schemaOptions =
                 readonlyConfig.getOptional(TableSchemaOptions.SCHEMA);
-        TablePath tablePath = TablePath.of(readonlyConfig.get(TOPIC));
+        TablePath tablePath = TablePath.of(null, readonlyConfig.get(TOPIC));
         TableSchema tableSchema;
         if (schemaOptions.isPresent()) {
             tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 0d5bee0d15..a8a245dd9b 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryException;
@@ -184,7 +185,7 @@ public class PluginUtil {
             return sink;
         } else {
             if (catalogTables.size() > 1) {
-                Map<String, SeaTunnelSink> sinks = new HashMap<>();
+                Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
                 ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(sinkConfig);
                 catalogTables.forEach(
                         catalogTable -> {
@@ -202,7 +203,7 @@ public class PluginUtil {
                                             .createSink(context)
                                             .createSink();
                             action.setJobContext(jobContext);
-                            sinks.put(catalogTable.getTablePath().toString(), 
action);
+                            sinks.put(catalogTable.getTablePath(), action);
                         });
                 return FactoryUtil.createMultiTableSink(sinks, readonlyConfig, 
classLoader);
             }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index b6de6d2cad..6f24e1c3fe 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -106,7 +107,7 @@ public class SinkExecuteProcessor
                     fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
             Optional<? extends Factory> factory = plugins.get(i);
             boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
-            Map<String, SeaTunnelSink> sinks = new HashMap<>();
+            Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
             if (fallBack) {
                 for (CatalogTable catalogTable : stream.getCatalogTables()) {
                     SeaTunnelSink fallBackSink =
@@ -122,8 +123,7 @@ public class SinkExecuteProcessor
                     fallBackSink.setTypeInfo(sourceType);
                     handleSaveMode(fallBackSink);
                     TableIdentifier tableId = catalogTable.getTableId();
-                    String tableIdName = tableId.toTablePath().toString();
-                    sinks.put(tableIdName, fallBackSink);
+                    sinks.put(tableId.toTablePath(), fallBackSink);
                 }
             } else {
                 for (CatalogTable catalogTable : stream.getCatalogTables()) {
@@ -141,8 +141,7 @@ public class SinkExecuteProcessor
                     seaTunnelSink.setJobContext(jobContext);
                     handleSaveMode(seaTunnelSink);
                     TableIdentifier tableId = catalogTable.getTableId();
-                    String tableIdName = tableId.toTablePath().toString();
-                    sinks.put(tableIdName, seaTunnelSink);
+                    sinks.put(tableId.toTablePath(), seaTunnelSink);
                 }
             }
             SeaTunnelSink sink =
@@ -168,7 +167,9 @@ public class SinkExecuteProcessor
 
     // if not support multi table, rollback
     public SeaTunnelSink tryGenerateMultiTableSink(
-            Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, 
ClassLoader classLoader) {
+            Map<TablePath, SeaTunnelSink> sinks,
+            ReadonlyConfig sinkConfig,
+            ClassLoader classLoader) {
         if (sinks.values().stream().anyMatch(sink -> !(sink instanceof 
SupportMultiTableSink))) {
             log.info("Unsupported multi table sink api, rollback to sink 
template");
             // choose the first sink
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c7d4e1f880..d41bfe34ce 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -107,7 +108,7 @@ public class SinkExecuteProcessor
                     fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
             Optional<? extends Factory> factory = plugins.get(i);
             boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
-            Map<String, SeaTunnelSink> sinks = new HashMap<>();
+            Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
             if (fallBack) {
                 for (CatalogTable catalogTable : stream.getCatalogTables()) {
                     SeaTunnelSink fallBackSink =
@@ -123,8 +124,7 @@ public class SinkExecuteProcessor
                     fallBackSink.setTypeInfo(sourceType);
                     handleSaveMode(fallBackSink);
                     TableIdentifier tableId = catalogTable.getTableId();
-                    String tableIdName = tableId.toTablePath().toString();
-                    sinks.put(tableIdName, fallBackSink);
+                    sinks.put(tableId.toTablePath(), fallBackSink);
                 }
             } else {
                 for (CatalogTable catalogTable : stream.getCatalogTables()) {
@@ -142,8 +142,7 @@ public class SinkExecuteProcessor
                     seaTunnelSink.setJobContext(jobContext);
                     handleSaveMode(seaTunnelSink);
                     TableIdentifier tableId = catalogTable.getTableId();
-                    String tableIdName = tableId.toTablePath().toString();
-                    sinks.put(tableIdName, seaTunnelSink);
+                    sinks.put(tableId.toTablePath(), seaTunnelSink);
                 }
             }
             SeaTunnelSink sink =
@@ -174,7 +173,9 @@ public class SinkExecuteProcessor
 
     // if not support multi table, rollback
     public SeaTunnelSink tryGenerateMultiTableSink(
-            Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, 
ClassLoader classLoader) {
+            Map<TablePath, SeaTunnelSink> sinks,
+            ReadonlyConfig sinkConfig,
+            ClassLoader classLoader) {
         if (sinks.values().stream().anyMatch(sink -> !(sink instanceof 
SupportMultiTableSink))) {
             log.info("Unsupported multi table sink api, rollback to sink 
template");
             // choose the first sink
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 6c3aabe691..d4f99d65f5 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -168,7 +169,7 @@ public class SinkExecuteProcessor
                 }
             }
         } else if (sink instanceof MultiTableSink) {
-            Map<String, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
+            Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
             for (SeaTunnelSink seaTunnelSink : sinks.values()) {
                 handleSaveMode(seaTunnelSink);
             }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 4cccedeb79..b66aaf7d86 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -169,7 +170,7 @@ public class SinkExecuteProcessor
                 }
             }
         } else if (sink instanceof MultiTableSink) {
-            Map<String, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
+            Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
             for (SeaTunnelSink seaTunnelSink : sinks.values()) {
                 handleSaveMode(seaTunnelSink);
             }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index b199a2848a..595fe3042e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -269,6 +269,26 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    public void testSourceKafkaTopicWithMultipleDotConsoleAssertCatalogTable(
+            TestContainer container) throws IOException, InterruptedException {
+        TextSerializationSchema serializer =
+                TextSerializationSchema.builder()
+                        .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                        .delimiter(",")
+                        .build();
+        generateTestData(
+                row ->
+                        new ProducerRecord<>(
+                                "test.multiple.point.topic.json", null, 
serializer.serialize(row)),
+                0,
+                10);
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testSourceKafkaJsonToConsole(TestContainer container)
             throws IOException, InterruptedException {
@@ -295,7 +315,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         DEFAULT_FORMAT,
                         DEFAULT_FIELD_DELIMITER,
                         null);
-        generateTestData(serializer::serializeRow, 0, 100);
+        generateTestData(serializer::serializeRow, 0, 10);
         Container.ExecResult execResult =
                 container.executeJob(
                         
"/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf
new file mode 100644
index 0000000000..f1fde74645
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test.multiple.point.topic.json"
+    plugin_output = "kafka_table"
+    start_mode = "earliest"
+    format_error_handle_way = skip
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "kafka_table"
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = id
+            field_type = bigint
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              },
+              {
+                rule_type = MIN
+                rule_value = 0
+              },
+              {
+                rule_type = MAX
+                rule_value = 99
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
index 17d9254525..283545b280 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.core.dag.actions;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
@@ -25,5 +27,5 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 public class SinkConfig implements Config {
-    private String multipleRowTableId;
+    private TablePath tablePath;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 22abdfbffd..08df272c43 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import 
org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
@@ -657,7 +658,7 @@ public class MultipleTableJobConfigParser {
             log.info("Unsupported multi table sink api, rollback to sink 
template");
             return Optional.empty();
         }
-        Map<String, SeaTunnelSink> sinks = new HashMap<>();
+        Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
         Set<URL> jars =
                 sinkActions.stream()
                         .flatMap(a -> a.getJarUrls().stream())
@@ -665,8 +666,8 @@ public class MultipleTableJobConfigParser {
         sinkActions.forEach(
                 action -> {
                     SeaTunnelSink sink = action.getSink();
-                    String tableId = 
action.getConfig().getMultipleRowTableId();
-                    sinks.put(tableId, sink);
+                    TablePath tablePath = action.getConfig().getTablePath();
+                    sinks.put(tablePath, sink);
                 });
         SeaTunnelSink<?, ?, ?, ?> sink =
                 FactoryUtil.createMultiTableSink(sinks, options, classLoader);
@@ -698,12 +699,11 @@ public class MultipleTableJobConfigParser {
                 FactoryUtil.createAndPrepareSink(
                         catalogTable, readonlyConfig, classLoader, factoryId);
         sink.setJobContext(jobConfig.getJobContext());
-        SinkConfig actionConfig =
-                new 
SinkConfig(catalogTable.getTableId().toTablePath().toString());
+        SinkConfig actionConfig = new 
SinkConfig(catalogTable.getTableId().toTablePath());
         long id = idGenerator.getNextId();
         String actionName =
                 JobConfigParser.createSinkActionName(
-                        configIndex, factoryId, 
actionConfig.getMultipleRowTableId());
+                        configIndex, factoryId, 
actionConfig.getTablePath().toString());
         SinkAction<?, ?, ?, ?> sinkAction =
                 new SinkAction<>(
                         id,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index bcc49daae2..cbc8db9632 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -330,7 +330,7 @@ public class PhysicalPlanGenerator {
                                             (PhysicalExecutionFlow) nextFlow;
                                     SinkAction sinkAction = (SinkAction) 
sinkFlow.getAction();
                                     String sinkTableId =
-                                            
sinkAction.getConfig().getMultipleRowTableId();
+                                            
sinkAction.getConfig().getTablePath().toString();
 
                                     long taskIDPrefix = 
idGenerator.getNextId();
                                     long taskGroupIDPrefix = 
idGenerator.getNextId();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 46883fdfcf..a8fa0de57a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
@@ -527,7 +528,7 @@ public class JobMaster {
                 }
             }
         } else if (sink instanceof MultiTableSink) {
-            Map<String, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
+            Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
             for (SeaTunnelSink seaTunnelSink : sinks.values()) {
                 handleSaveMode(seaTunnelSink);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 3f47ee84c3..95996e3b8b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -123,13 +123,13 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
         if (isMulti) {
             sinkTables = ((MultiTableSink) 
sinkAction.getSink()).getSinkTables();
-            String[] upstreamTablePaths =
+            TablePath[] upstreamTablePaths =
                     ((MultiTableSink) sinkAction.getSink())
                             .getSinks()
                             .keySet()
-                            .toArray(new String[0]);
+                            .toArray(new TablePath[0]);
             for (int i = 0; i < ((MultiTableSink) 
sinkAction.getSink()).getSinks().size(); i++) {
-                tablesMaps.put(TablePath.of(upstreamTablePaths[i]), 
sinkTables.get(i));
+                tablesMaps.put(upstreamTablePaths[i], sinkTables.get(i));
             }
         } else {
             Optional<CatalogTable> catalogTable = 
sinkAction.getSink().getWriteCatalogTable();


Reply via email to