This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3d4f4bb33a [Fix] [Kafka Source] kafka source use topic as table name
instead of fullName (#8401)
3d4f4bb33a is described below
commit 3d4f4bb33a61e6c7f7bc8bb0b9d9d002d86d6ef9
Author: Cheun99 <[email protected]>
AuthorDate: Wed Jan 15 10:44:35 2025 +0800
[Fix] [Kafka Source] kafka source use topic as table name instead of
fullName (#8401)
---
.../api/sink/multitablesink/MultiTableSink.java | 30 +++++----
.../seatunnel/api/table/factory/FactoryUtil.java | 3 +-
.../table/factory/MultiTableFactoryContext.java | 5 +-
.../seatunnel/kafka/source/KafkaSourceConfig.java | 4 +-
.../core/starter/execution/PluginUtil.java | 5 +-
.../flink/execution/SinkExecuteProcessor.java | 13 ++--
.../flink/execution/SinkExecuteProcessor.java | 13 ++--
.../spark/execution/SinkExecuteProcessor.java | 3 +-
.../spark/execution/SinkExecuteProcessor.java | 3 +-
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 22 +++++-
...ource_topic_multiple_point_text_to_console.conf | 78 ++++++++++++++++++++++
.../engine/core/dag/actions/SinkConfig.java | 4 +-
.../core/parse/MultipleTableJobConfigParser.java | 12 ++--
.../server/dag/physical/PhysicalPlanGenerator.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 3 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 6 +-
16 files changed, 158 insertions(+), 48 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 23f4fc455b..c9d810e1fe 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -53,7 +53,7 @@ public class MultiTableSink
MultiTableAggregatedCommitInfo>,
SupportSchemaEvolutionSink {
- @Getter private final Map<String, SeaTunnelSink> sinks;
+ @Getter private final Map<TablePath, SeaTunnelSink> sinks;
private final int replicaNum;
public MultiTableSink(MultiTableFactoryContext context) {
@@ -72,9 +72,10 @@ public class MultiTableSink
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new
HashMap<>();
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new
HashMap<>();
for (int i = 0; i < replicaNum; i++) {
- for (String tableIdentifier : sinks.keySet()) {
- SeaTunnelSink sink = sinks.get(tableIdentifier);
+ for (TablePath tablePath : sinks.keySet()) {
+ SeaTunnelSink sink = sinks.get(tablePath);
int index = context.getIndexOfSubtask() * replicaNum + i;
+ String tableIdentifier = tablePath.toString();
writers.put(
SinkIdentifier.of(tableIdentifier, index),
sink.createWriter(new SinkContextProxy(index,
replicaNum, context)));
@@ -91,10 +92,10 @@ public class MultiTableSink
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new
HashMap<>();
for (int i = 0; i < replicaNum; i++) {
- for (String tableIdentifier : sinks.keySet()) {
- SeaTunnelSink sink = sinks.get(tableIdentifier);
+ for (TablePath tablePath : sinks.keySet()) {
+ SeaTunnelSink sink = sinks.get(tablePath);
int index = context.getIndexOfSubtask() * replicaNum + i;
- SinkIdentifier sinkIdentifier =
SinkIdentifier.of(tableIdentifier, index);
+ SinkIdentifier sinkIdentifier =
SinkIdentifier.of(tablePath.toString(), index);
List<?> state =
states.stream()
.map(
@@ -113,7 +114,7 @@ public class MultiTableSink
sink.restoreWriter(
new SinkContextProxy(index, replicaNum,
context), state));
}
- sinkWritersContext.put(SinkIdentifier.of(tableIdentifier,
index), context);
+ sinkWritersContext.put(sinkIdentifier, context);
}
}
return new MultiTableSinkWriter(writers, replicaNum,
sinkWritersContext);
@@ -127,12 +128,13 @@ public class MultiTableSink
@Override
public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter()
throws IOException {
Map<String, SinkCommitter<?>> committers = new HashMap<>();
- for (String tableIdentifier : sinks.keySet()) {
- SeaTunnelSink sink = sinks.get(tableIdentifier);
+ for (TablePath tablePath : sinks.keySet()) {
+ SeaTunnelSink sink = sinks.get(tablePath);
sink.createCommitter()
.ifPresent(
committer ->
- committers.put(tableIdentifier,
(SinkCommitter<?>) committer));
+ committers.put(
+ tablePath.toString(),
(SinkCommitter<?>) committer));
}
if (committers.isEmpty()) {
return Optional.empty();
@@ -149,12 +151,12 @@ public class MultiTableSink
public Optional<SinkAggregatedCommitter<MultiTableCommitInfo,
MultiTableAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
Map<String, SinkAggregatedCommitter<?, ?>> aggCommitters = new
HashMap<>();
- for (String tableIdentifier : sinks.keySet()) {
- SeaTunnelSink sink = sinks.get(tableIdentifier);
+ for (TablePath tablePath : sinks.keySet()) {
+ SeaTunnelSink sink = sinks.get(tablePath);
Optional<SinkAggregatedCommitter<?, ?>> sinkOptional =
sink.createAggregatedCommitter();
sinkOptional.ifPresent(
sinkAggregatedCommitter ->
- aggCommitters.put(tableIdentifier,
sinkAggregatedCommitter));
+ aggCommitters.put(tablePath.toString(),
sinkAggregatedCommitter));
}
if (aggCommitters.isEmpty()) {
return Optional.empty();
@@ -171,7 +173,7 @@ public class MultiTableSink
tablePaths.add(
((CatalogTable)
values.get(i).getWriteCatalogTable().get()).getTablePath());
} else {
- tablePaths.add(TablePath.of(sinks.keySet().toArray(new
String[0])[i]));
+ tablePaths.add(sinks.keySet().toArray(new TablePath[0])[i]);
}
}
return tablePaths;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index c94b88be7c..e11afd1d19 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -178,7 +179,7 @@ public final class FactoryUtil {
public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createMultiTableSink(
- Map<String, SeaTunnelSink> sinks,
+ Map<TablePath, SeaTunnelSink> sinks,
ReadonlyConfig options,
ClassLoader classLoader) {
try {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
index 809b747eba..6caeeae8ed 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import lombok.Getter;
@@ -27,10 +28,10 @@ import java.util.Map;
@Getter
public class MultiTableFactoryContext extends TableSinkFactoryContext {
- private final Map<String, SeaTunnelSink> sinks;
+ private final Map<TablePath, SeaTunnelSink> sinks;
public MultiTableFactoryContext(
- ReadonlyConfig options, ClassLoader classLoader, Map<String,
SeaTunnelSink> sinks) {
+ ReadonlyConfig options, ClassLoader classLoader, Map<TablePath,
SeaTunnelSink> sinks) {
super(null, options, classLoader);
this.sinks = sinks;
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 44ca54f0f2..db591cfdf0 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -137,7 +137,7 @@ public class KafkaSourceConfig implements Serializable {
return consumerMetadataList.stream()
.collect(
Collectors.toMap(
- consumerMetadata ->
TablePath.of(consumerMetadata.getTopic()),
+ consumerMetadata -> TablePath.of(null,
consumerMetadata.getTopic()),
consumerMetadata -> consumerMetadata));
}
@@ -208,7 +208,7 @@ public class KafkaSourceConfig implements Serializable {
private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
Optional<Map<String, Object>> schemaOptions =
readonlyConfig.getOptional(TableSchemaOptions.SCHEMA);
- TablePath tablePath = TablePath.of(readonlyConfig.get(TOPIC));
+ TablePath tablePath = TablePath.of(null, readonlyConfig.get(TOPIC));
TableSchema tableSchema;
if (schemaOptions.isPresent()) {
tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 0d5bee0d15..a8a245dd9b 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryException;
@@ -184,7 +185,7 @@ public class PluginUtil {
return sink;
} else {
if (catalogTables.size() > 1) {
- Map<String, SeaTunnelSink> sinks = new HashMap<>();
+ Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sinkConfig);
catalogTables.forEach(
catalogTable -> {
@@ -202,7 +203,7 @@ public class PluginUtil {
.createSink(context)
.createSink();
action.setJobContext(jobContext);
- sinks.put(catalogTable.getTablePath().toString(),
action);
+ sinks.put(catalogTable.getTablePath(), action);
});
return FactoryUtil.createMultiTableSink(sinks, readonlyConfig,
classLoader);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index b6de6d2cad..6f24e1c3fe 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -106,7 +107,7 @@ public class SinkExecuteProcessor
fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
- Map<String, SeaTunnelSink> sinks = new HashMap<>();
+ Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink fallBackSink =
@@ -122,8 +123,7 @@ public class SinkExecuteProcessor
fallBackSink.setTypeInfo(sourceType);
handleSaveMode(fallBackSink);
TableIdentifier tableId = catalogTable.getTableId();
- String tableIdName = tableId.toTablePath().toString();
- sinks.put(tableIdName, fallBackSink);
+ sinks.put(tableId.toTablePath(), fallBackSink);
}
} else {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
@@ -141,8 +141,7 @@ public class SinkExecuteProcessor
seaTunnelSink.setJobContext(jobContext);
handleSaveMode(seaTunnelSink);
TableIdentifier tableId = catalogTable.getTableId();
- String tableIdName = tableId.toTablePath().toString();
- sinks.put(tableIdName, seaTunnelSink);
+ sinks.put(tableId.toTablePath(), seaTunnelSink);
}
}
SeaTunnelSink sink =
@@ -168,7 +167,9 @@ public class SinkExecuteProcessor
// if not support multi table, rollback
public SeaTunnelSink tryGenerateMultiTableSink(
- Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig,
ClassLoader classLoader) {
+ Map<TablePath, SeaTunnelSink> sinks,
+ ReadonlyConfig sinkConfig,
+ ClassLoader classLoader) {
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof
SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink
template");
// choose the first sink
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c7d4e1f880..d41bfe34ce 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -107,7 +108,7 @@ public class SinkExecuteProcessor
fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
- Map<String, SeaTunnelSink> sinks = new HashMap<>();
+ Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink fallBackSink =
@@ -123,8 +124,7 @@ public class SinkExecuteProcessor
fallBackSink.setTypeInfo(sourceType);
handleSaveMode(fallBackSink);
TableIdentifier tableId = catalogTable.getTableId();
- String tableIdName = tableId.toTablePath().toString();
- sinks.put(tableIdName, fallBackSink);
+ sinks.put(tableId.toTablePath(), fallBackSink);
}
} else {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
@@ -142,8 +142,7 @@ public class SinkExecuteProcessor
seaTunnelSink.setJobContext(jobContext);
handleSaveMode(seaTunnelSink);
TableIdentifier tableId = catalogTable.getTableId();
- String tableIdName = tableId.toTablePath().toString();
- sinks.put(tableIdName, seaTunnelSink);
+ sinks.put(tableId.toTablePath(), seaTunnelSink);
}
}
SeaTunnelSink sink =
@@ -174,7 +173,9 @@ public class SinkExecuteProcessor
// if not support multi table, rollback
public SeaTunnelSink tryGenerateMultiTableSink(
- Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig,
ClassLoader classLoader) {
+ Map<TablePath, SeaTunnelSink> sinks,
+ ReadonlyConfig sinkConfig,
+ ClassLoader classLoader) {
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof
SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink
template");
// choose the first sink
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 6c3aabe691..d4f99d65f5 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -168,7 +169,7 @@ public class SinkExecuteProcessor
}
}
} else if (sink instanceof MultiTableSink) {
- Map<String, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
+ Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 4cccedeb79..b66aaf7d86 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -169,7 +170,7 @@ public class SinkExecuteProcessor
}
}
} else if (sink instanceof MultiTableSink) {
- Map<String, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
+ Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index b199a2848a..595fe3042e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -269,6 +269,26 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ public void testSourceKafkaTopicWithMultipleDotConsoleAssertCatalogTable(
+ TestContainer container) throws IOException, InterruptedException {
+ TextSerializationSchema serializer =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+ .delimiter(",")
+ .build();
+ generateTestData(
+ row ->
+ new ProducerRecord<>(
+ "test.multiple.point.topic.json", null,
serializer.serialize(row)),
+ 0,
+ 10);
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
@TestTemplate
public void testSourceKafkaJsonToConsole(TestContainer container)
throws IOException, InterruptedException {
@@ -295,7 +315,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER,
null);
- generateTestData(serializer::serializeRow, 0, 100);
+ generateTestData(serializer::serializeRow, 0, 10);
Container.ExecResult execResult =
container.executeJob(
"/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf
new file mode 100644
index 0000000000..f1fde74645
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test.multiple.point.topic.json"
+ plugin_output = "kafka_table"
+ start_mode = "earliest"
+ format_error_handle_way = skip
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "kafka_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
index 17d9254525..283545b280 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.core.dag.actions;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -25,5 +27,5 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class SinkConfig implements Config {
- private String multipleRowTableId;
+ private TablePath tablePath;
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 22abdfbffd..08df272c43 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import
org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
@@ -657,7 +658,7 @@ public class MultipleTableJobConfigParser {
log.info("Unsupported multi table sink api, rollback to sink
template");
return Optional.empty();
}
- Map<String, SeaTunnelSink> sinks = new HashMap<>();
+ Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
Set<URL> jars =
sinkActions.stream()
.flatMap(a -> a.getJarUrls().stream())
@@ -665,8 +666,8 @@ public class MultipleTableJobConfigParser {
sinkActions.forEach(
action -> {
SeaTunnelSink sink = action.getSink();
- String tableId =
action.getConfig().getMultipleRowTableId();
- sinks.put(tableId, sink);
+ TablePath tablePath = action.getConfig().getTablePath();
+ sinks.put(tablePath, sink);
});
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createMultiTableSink(sinks, options, classLoader);
@@ -698,12 +699,11 @@ public class MultipleTableJobConfigParser {
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader, factoryId);
sink.setJobContext(jobConfig.getJobContext());
- SinkConfig actionConfig =
- new
SinkConfig(catalogTable.getTableId().toTablePath().toString());
+ SinkConfig actionConfig = new
SinkConfig(catalogTable.getTableId().toTablePath());
long id = idGenerator.getNextId();
String actionName =
JobConfigParser.createSinkActionName(
- configIndex, factoryId,
actionConfig.getMultipleRowTableId());
+ configIndex, factoryId,
actionConfig.getTablePath().toString());
SinkAction<?, ?, ?, ?> sinkAction =
new SinkAction<>(
id,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index bcc49daae2..cbc8db9632 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -330,7 +330,7 @@ public class PhysicalPlanGenerator {
(PhysicalExecutionFlow) nextFlow;
SinkAction sinkAction = (SinkAction)
sinkFlow.getAction();
String sinkTableId =
-
sinkAction.getConfig().getMultipleRowTableId();
+
sinkAction.getConfig().getTablePath().toString();
long taskIDPrefix =
idGenerator.getNextId();
long taskGroupIDPrefix =
idGenerator.getNextId();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 46883fdfcf..a8fa0de57a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
@@ -527,7 +528,7 @@ public class JobMaster {
}
}
} else if (sink instanceof MultiTableSink) {
- Map<String, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
+ Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 3f47ee84c3..95996e3b8b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -123,13 +123,13 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
if (isMulti) {
sinkTables = ((MultiTableSink)
sinkAction.getSink()).getSinkTables();
- String[] upstreamTablePaths =
+ TablePath[] upstreamTablePaths =
((MultiTableSink) sinkAction.getSink())
.getSinks()
.keySet()
- .toArray(new String[0]);
+ .toArray(new TablePath[0]);
for (int i = 0; i < ((MultiTableSink)
sinkAction.getSink()).getSinks().size(); i++) {
- tablesMaps.put(TablePath.of(upstreamTablePaths[i]),
sinkTables.get(i));
+ tablesMaps.put(upstreamTablePaths[i], sinkTables.get(i));
}
} else {
Optional<CatalogTable> catalogTable =
sinkAction.getSink().getWriteCatalogTable();