misi1987107 commented on code in PR #10154:
URL: https://github.com/apache/seatunnel/pull/10154#discussion_r2638857016


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaMultiTableSinkIT.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+@Slf4j
+public class KafkaMultiTableSinkIT extends TestSuiteBase implements 
TestResource {
+

Review Comment:
   Ok,I can merge it



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaKerberosIT.java:
##########
@@ -330,7 +330,7 @@ public void testNotKerberosConfig(TestContainer container)
                 .untilAsserted(
                         () -> {
                             String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
-                            Assertions.assertEquals("CANCELED", jobStatus);
+                            Assertions.assertEquals("CANCELING", jobStatus);

Review Comment:
   It cannot be transitioned from CANCELING to CANCELED
   related https://github.com/apache/seatunnel/issues/9995



##########
docs/zh/connector-v2/sink/Kafka.md:
##########
@@ -306,6 +307,75 @@ sink {
 ```
 Note:key/value 需要 byte[]类型.
 
+### 多表写入
+
+Kafka Sink 支持将多个表的数据写入到不同的 Kafka topic。当上游数据源产生多个表的数据时,可以在 `topic` 配置中使用 
`${table_name}` 占位符,根据表名动态路由数据到对应的 topic。
+

Review Comment:
   The ${datasample_name} ${schema_name} variable is already supported, I will 
modify the document



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaKerberosIT.java:
##########
@@ -330,7 +330,7 @@ public void testNotKerberosConfig(TestContainer container)
                 .untilAsserted(
                         () -> {
                             String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
-                            Assertions.assertEquals("CANCELED", jobStatus);
+                            Assertions.assertEquals("CANCELING", jobStatus);

Review Comment:
   2026-02-09T04:29:04.3693872Z Caused by: org.opentest4j.AssertionFailedError: 
expected: CANCELED but was: CANCELING
   2026-02-09T04:29:04.3694221Z         at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   2026-02-09T04:29:04.3694564Z         at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   2026-02-09T04:29:04.3694780Z         at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
   2026-02-09T04:29:04.3695186Z         at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
   2026-02-09T04:29:04.3695447Z         at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
   2026-02-09T04:29:04.3695648Z         at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
   2026-02-09T04:29:04.3696146Z         at 
org.apache.seatunnel.e2e.connector.kafka.KafkaKerberosIT.lambda$testNotKerberosConfig$4(KafkaKerberosIT.java:333)
   2026-02-09T04:29:04.3696400Z         at 
org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
   2026-02-09T04:29:04.3696814Z         at 
org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
   2026-02-09T04:29:04.3697143Z         at 
org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
   2026-02-09T04:29:04.3697305Z         at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
   2026-02-09T04:29:04.3697659Z         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   2026-02-09T04:29:04.3697899Z         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   2026-02-09T04:29:04.3697999Z         at java.lang.Thread.run(Thread.java:750)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to