This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 14be00d6c [flink][kafka-cdc] Fix that Kafka consumer mistakenly
consumes records when getting KafkaSchema (#1733)
14be00d6c is described below
commit 14be00d6c783bfbd7e14d3055b697f1057ce5a51
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 4 19:17:15 2023 +0800
[flink][kafka-cdc] Fix that Kafka consumer mistakenly consumes records when
getting KafkaSchema (#1733)
---
.../paimon/flink/action/cdc/kafka/KafkaSchema.java | 26 +++++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index deda133de..130bf2181 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
@@ -35,7 +37,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
-import java.util.UUID;
/** Utility class to load canal kafka schema. */
public class KafkaSchema {
@@ -75,25 +76,38 @@ public class KafkaSchema {
}
private static KafkaConsumer<String, String> getKafkaEarliestConsumer(
- Configuration kafkaConfig) {
+ Configuration kafkaConfig, String topic) {
Properties props = new Properties();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
+ props.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ if (partitionInfos.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Failed to find partition information for topic " + topic);
+ }
+ int firstPartition =
+
partitionInfos.stream().map(PartitionInfo::partition).sorted().findFirst().get();
+ consumer.assign(Collections.singletonList(new TopicPartition(topic,
firstPartition)));
- return new KafkaConsumer<>(props);
+ return consumer;
}
public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String
topic)
throws Exception {
- KafkaConsumer<String, String> consumer =
getKafkaEarliestConsumer(kafkaConfig);
+ KafkaConsumer<String, String> consumer =
getKafkaEarliestConsumer(kafkaConfig, topic);
- consumer.subscribe(Collections.singletonList(topic));
int retry = 0;
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));