This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 119ec4294 [flink] Fix kafka CDC Ingestion multi-topic exception (#2030)
119ec4294 is described below
commit 119ec42940cf6cac2dc48fe9e43450a03f4b8a45
Author: monster <[email protected]>
AuthorDate: Mon Sep 18 18:17:09 2023 +0800
[flink] Fix kafka CDC Ingestion multi-topic exception (#2030)
---
.../apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 02a3b0640..c249db8ee 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -89,8 +89,14 @@ class KafkaActionUtils {
static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
validateKafkaConfig(kafkaConfig);
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
+
+ List<String> topics =
+ kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream()
+ .flatMap(topic -> Arrays.stream(topic.split(",")))
+ .collect(Collectors.toList());
+
kafkaSourceBuilder
- .setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC))
+ .setTopics(topics)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
Properties properties = new Properties();