This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bd03c5b7c [flink] Improve get kafka schema to backoff strategy 
sleeping (#1758)
bd03c5b7c is described below

commit bd03c5b7c855b1b5d7d6f4ddfc1e8f19bf2b14fc
Author: monster <[email protected]>
AuthorDate: Wed Aug 9 09:06:38 2023 +0800

    [flink] Improve get kafka schema to backoff strategy sleeping (#1758)
---
 .../apache/paimon/flink/action/cdc/kafka/KafkaSchema.java | 15 +++++++++------
 .../action/cdc/kafka/KafkaCanalSyncTableActionITCase.java |  2 +-
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 741ca0d57..ecdae5a8d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -43,8 +43,8 @@ import static 
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPro
 /** Utility class to load canal kafka schema. */
 public class KafkaSchema {
 
-    private static final int MAX_RETRY = 100;
-
+    private static final int MAX_RETRY = 5;
+    private static final int POLL_TIMEOUT_MILLIS = 100;
     private final String databaseName;
     private final String tableName;
     private final Map<String, DataType> fields;
@@ -107,10 +107,11 @@ public class KafkaSchema {
     public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String 
topic)
             throws Exception {
         KafkaConsumer<String, String> consumer = 
getKafkaEarliestConsumer(kafkaConfig, topic);
-
         int retry = 0;
+        int retryInterval = 1000;
         while (true) {
-            ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+            ConsumerRecords<String, String> records =
+                    consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MILLIS));
             for (ConsumerRecord<String, String> record : records) {
                 String format = 
kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
                 if ("canal-json".equals(format)) {
@@ -125,9 +126,11 @@ public class KafkaSchema {
                 }
             }
             if (retry == MAX_RETRY) {
-                throw new Exception("Could not get metadata from server,topic 
:" + topic);
+                throw new Exception(
+                        String.format("Could not get metadata from 
server,topic:%s", topic));
             }
-            Thread.sleep(100);
+            Thread.sleep(retryInterval);
+            retryInterval *= 2;
             retry++;
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 785dd8edc..42771ff2c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -686,7 +686,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
 
         assertThatThrownBy(() -> action.build(env))
                 .isInstanceOf(Exception.class)
-                .hasMessage("Could not get metadata from server,topic 
:no_non_ddl_data");
+                .hasMessage("Could not get metadata from 
server,topic:no_non_ddl_data");
     }
 
     @Test

Reply via email to