This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d1a10647 [hotfix] Fix unstable Kafka CDC starup mode tests (#3125)
2d1a10647 is described below

commit 2d1a106475b1423208e7cf1efc14d4034e2a7e0d
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 1 08:55:04 2024 +0800

    [hotfix] Fix unstable Kafka CDC starup mode tests (#3125)
---
 .../paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java | 12 +++++++++++-
 .../action/cdc/kafka/KafkaCanalSyncTableActionITCase.java    |  4 ++--
 .../flink/action/cdc/kafka/KafkaSyncTableActionITCase.java   |  2 +-
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 9db87a556..002280079 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.jupiter.api.AfterEach;
@@ -63,6 +64,7 @@ import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -278,6 +280,10 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
     }
 
     void writeRecordsToKafka(String topic, List<String> lines) throws 
Exception {
+        writeRecordsToKafka(topic, lines, false);
+    }
+
+    void writeRecordsToKafka(String topic, List<String> lines, boolean wait) 
throws Exception {
         Properties producerProperties = getStandardProps();
         producerProperties.setProperty("retries", "0");
         producerProperties.put(
@@ -289,7 +295,11 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
             try {
                 JsonNode jsonNode = objectMapper.readTree(lines.get(i));
                 if (!StringUtils.isEmpty(lines.get(i))) {
-                    kafkaProducer.send(new ProducerRecord<>(topic, 
lines.get(i)));
+                    Future<RecordMetadata> sendFuture =
+                            kafkaProducer.send(new ProducerRecord<>(topic, 
lines.get(i)));
+                    if (wait) {
+                        sendFuture.get();
+                    }
                 }
             } catch (Exception e) {
                 // ignore
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 148f647e7..40743fae6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -719,7 +719,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
         // ---------- Write the Canal json into Kafka -------------------
         List<String> lines = 
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
         try {
-            writeRecordsToKafka(topic, lines);
+            writeRecordsToKafka(topic, lines, true);
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
@@ -764,7 +764,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
         // ---------- Write the Canal json into Kafka -------------------
         List<String> lines = 
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
         try {
-            writeRecordsToKafka(topic, lines);
+            writeRecordsToKafka(topic, lines, true);
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index f2f8fc246..e13ab86e9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -278,7 +278,7 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                 readLines(
                         
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
         try {
-            writeRecordsToKafka(topic, lines);
+            writeRecordsToKafka(topic, lines, true);
         } catch (Exception e) {
             throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
         }

Reply via email to