This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 119ec4294 [flink] Fix kafka CDC Ingestion multi-topic exception (#2030)
119ec4294 is described below

commit 119ec42940cf6cac2dc48fe9e43450a03f4b8a45
Author: monster <[email protected]>
AuthorDate: Mon Sep 18 18:17:09 2023 +0800

    [flink] Fix kafka CDC Ingestion multi-topic exception (#2030)
---
 .../apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 02a3b0640..c249db8ee 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -89,8 +89,14 @@ class KafkaActionUtils {
     static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
         validateKafkaConfig(kafkaConfig);
         KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
+
+        List<String> topics =
+                kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream()
+                        .flatMap(topic -> Arrays.stream(topic.split(",")))
+                        .collect(Collectors.toList());
+
         kafkaSourceBuilder
-                .setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC))
+                .setTopics(topics)
                 .setValueOnlyDeserializer(new SimpleStringSchema())
                 .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
         Properties properties = new Properties();

Reply via email to