This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 14be00d6c [flink][kafka-cdc] Fix that Kafka consumer mistakenly 
consumes records when getting KafkaSchema (#1733)
14be00d6c is described below

commit 14be00d6c783bfbd7e14d3055b697f1057ce5a51
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 4 19:17:15 2023 +0800

    [flink][kafka-cdc] Fix that Kafka consumer mistakenly consumes records when 
getting KafkaSchema (#1733)
---
 .../paimon/flink/action/cdc/kafka/KafkaSchema.java | 26 +++++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index deda133de..130bf2181 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
@@ -35,7 +37,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.UUID;
 
 /** Utility class to load canal kafka schema. */
 public class KafkaSchema {
@@ -75,25 +76,38 @@ public class KafkaSchema {
     }
 
     private static KafkaConsumer<String, String> getKafkaEarliestConsumer(
-            Configuration kafkaConfig) {
+            Configuration kafkaConfig, String topic) {
         Properties props = new Properties();
         props.put(
                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                 
kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
+        props.put(
+                ConsumerConfig.GROUP_ID_CONFIG,
+                kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID));
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         props.put(
                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        if (partitionInfos.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Failed to find partition information for topic " + topic);
+        }
+        int firstPartition =
+                
partitionInfos.stream().map(PartitionInfo::partition).sorted().findFirst().get();
+        consumer.assign(Collections.singletonList(new TopicPartition(topic, 
firstPartition)));
 
-        return new KafkaConsumer<>(props);
+        return consumer;
     }
 
     public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String 
topic)
             throws Exception {
-        KafkaConsumer<String, String> consumer = 
getKafkaEarliestConsumer(kafkaConfig);
+        KafkaConsumer<String, String> consumer = 
getKafkaEarliestConsumer(kafkaConfig, topic);
 
-        consumer.subscribe(Collections.singletonList(topic));
         int retry = 0;
         while (true) {
             ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));

Reply via email to