This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b340e50b2 [flink] Fetching schema in Kafka table synchronization
should first seek to beginningOffsets (#1761)
b340e50b2 is described below
commit b340e50b2c664eefb659797e421c8745ddb96da5
Author: ChenShuai1981 <[email protected]>
AuthorDate: Fri Aug 11 10:01:43 2023 +0800
[flink] Fetching schema in Kafka table synchronization should first seek to
beginningOffsets (#1761)
---
.../org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index ecdae5a8d..f5a1e5ef0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -44,7 +45,7 @@ import static
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPro
public class KafkaSchema {
private static final int MAX_RETRY = 5;
- private static final int POLL_TIMEOUT_MILLIS = 100;
+ private static final int POLL_TIMEOUT_MILLIS = 1000;
private final String databaseName;
private final String tableName;
private final Map<String, DataType> fields;
@@ -99,7 +100,10 @@ public class KafkaSchema {
}
int firstPartition =
partitionInfos.stream().map(PartitionInfo::partition).sorted().findFirst().get();
- consumer.assign(Collections.singletonList(new TopicPartition(topic,
firstPartition)));
+ Collection<TopicPartition> topicPartitions =
+ Collections.singletonList(new TopicPartition(topic,
firstPartition));
+ consumer.assign(topicPartitions);
+ consumer.seekToBeginning(topicPartitions);
return consumer;
}