This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e8224dc8 [flink] Support properties.group.id in kafka cdc (#1456)
4e8224dc8 is described below

commit 4e8224dc821d9de36a474c9603009e327f4f8098
Author: HZY <[email protected]>
AuthorDate: Mon Jul 3 11:11:02 2023 +0800

    [flink] Support properties.group.id in kafka cdc (#1456)
---
 .../org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 7f0923ea2..43bf5d8c7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -26,6 +26,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
@@ -158,10 +159,11 @@ class KafkaActionUtils {
 
     static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
         KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder();
+        String groupId = kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID);
         kafkaSourceBuilder
                 .setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC))
                 .setValueOnlyDeserializer(new SimpleStringSchema())
-                .setGroupId(UUID.randomUUID().toString());
+                .setGroupId(StringUtils.isEmpty(groupId) ? 
UUID.randomUUID().toString() : groupId);
         Properties properties = new Properties();
         for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) 
{
             String key = entry.getKey();

Reply via email to