This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4e8224dc8 [flink] Support properties.group.id in kafka cdc (#1456)
4e8224dc8 is described below
commit 4e8224dc821d9de36a474c9603009e327f4f8098
Author: HZY <[email protected]>
AuthorDate: Mon Jul 3 11:11:02 2023 +0800
[flink] Support properties.group.id in kafka cdc (#1456)
---
.../org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 7f0923ea2..43bf5d8c7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -26,6 +26,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
@@ -158,10 +159,11 @@ class KafkaActionUtils {
static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder();
+ String groupId = kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID);
kafkaSourceBuilder
.setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC))
.setValueOnlyDeserializer(new SimpleStringSchema())
- .setGroupId(UUID.randomUUID().toString());
+ .setGroupId(StringUtils.isEmpty(groupId) ?
UUID.randomUUID().toString() : groupId);
Properties properties = new Properties();
for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet())
{
String key = entry.getKey();