This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b340e50b2 [flink] Fetching schema in Kafka table synchronization 
should first seek to beginningOffsets (#1761)
b340e50b2 is described below

commit b340e50b2c664eefb659797e421c8745ddb96da5
Author: ChenShuai1981 <[email protected]>
AuthorDate: Fri Aug 11 10:01:43 2023 +0800

    [flink] Fetching schema in Kafka table synchronization should first seek to 
beginningOffsets (#1761)
---
 .../org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index ecdae5a8d..f5a1e5ef0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +45,7 @@ import static 
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPro
 public class KafkaSchema {
 
     private static final int MAX_RETRY = 5;
-    private static final int POLL_TIMEOUT_MILLIS = 100;
+    private static final int POLL_TIMEOUT_MILLIS = 1000;
     private final String databaseName;
     private final String tableName;
     private final Map<String, DataType> fields;
@@ -99,7 +100,10 @@ public class KafkaSchema {
         }
         int firstPartition =
                 
partitionInfos.stream().map(PartitionInfo::partition).sorted().findFirst().get();
-        consumer.assign(Collections.singletonList(new TopicPartition(topic, 
firstPartition)));
+        Collection<TopicPartition> topicPartitions =
+                Collections.singletonList(new TopicPartition(topic, 
firstPartition));
+        consumer.assign(topicPartitions);
+        consumer.seekToBeginning(topicPartitions);
 
         return consumer;
     }

Reply via email to