This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bd03c5b7c [flink] Improve get kafka schema to backoff strategy
sleeping (#1758)
bd03c5b7c is described below
commit bd03c5b7c855b1b5d7d6f4ddfc1e8f19bf2b14fc
Author: monster <[email protected]>
AuthorDate: Wed Aug 9 09:06:38 2023 +0800
[flink] Improve get kafka schema to backoff strategy sleeping (#1758)
---
.../apache/paimon/flink/action/cdc/kafka/KafkaSchema.java | 15 +++++++++------
.../action/cdc/kafka/KafkaCanalSyncTableActionITCase.java | 2 +-
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 741ca0d57..ecdae5a8d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -43,8 +43,8 @@ import static
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPro
/** Utility class to load canal kafka schema. */
public class KafkaSchema {
- private static final int MAX_RETRY = 100;
-
+ private static final int MAX_RETRY = 5;
+ private static final int POLL_TIMEOUT_MILLIS = 100;
private final String databaseName;
private final String tableName;
private final Map<String, DataType> fields;
@@ -107,10 +107,11 @@ public class KafkaSchema {
public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String
topic)
throws Exception {
KafkaConsumer<String, String> consumer =
getKafkaEarliestConsumer(kafkaConfig, topic);
-
int retry = 0;
+ int retryInterval = 1000;
while (true) {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ ConsumerRecords<String, String> records =
+ consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MILLIS));
for (ConsumerRecord<String, String> record : records) {
String format =
kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
if ("canal-json".equals(format)) {
@@ -125,9 +126,11 @@ public class KafkaSchema {
}
}
if (retry == MAX_RETRY) {
- throw new Exception("Could not get metadata from server,topic
:" + topic);
+ throw new Exception(
+ String.format("Could not get metadata from
server,topic:%s", topic));
}
- Thread.sleep(100);
+ Thread.sleep(retryInterval);
+ retryInterval *= 2;
retry++;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 785dd8edc..42771ff2c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -686,7 +686,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaActionITCaseBase {
assertThatThrownBy(() -> action.build(env))
.isInstanceOf(Exception.class)
- .hasMessage("Could not get metadata from server,topic
:no_non_ddl_data");
+ .hasMessage("Could not get metadata from
server,topic:no_non_ddl_data");
}
@Test