This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2d1a10647 [hotfix] Fix unstable Kafka CDC starup mode tests (#3125)
2d1a10647 is described below
commit 2d1a106475b1423208e7cf1efc14d4034e2a7e0d
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 1 08:55:04 2024 +0800
[hotfix] Fix unstable Kafka CDC starup mode tests (#3125)
---
.../paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java | 12 +++++++++++-
.../action/cdc/kafka/KafkaCanalSyncTableActionITCase.java | 4 ++--
.../flink/action/cdc/kafka/KafkaSyncTableActionITCase.java | 2 +-
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 9db87a556..002280079 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterEach;
@@ -63,6 +64,7 @@ import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -278,6 +280,10 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
}
void writeRecordsToKafka(String topic, List<String> lines) throws
Exception {
+ writeRecordsToKafka(topic, lines, false);
+ }
+
+ void writeRecordsToKafka(String topic, List<String> lines, boolean wait)
throws Exception {
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
producerProperties.put(
@@ -289,7 +295,11 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
try {
JsonNode jsonNode = objectMapper.readTree(lines.get(i));
if (!StringUtils.isEmpty(lines.get(i))) {
- kafkaProducer.send(new ProducerRecord<>(topic,
lines.get(i)));
+ Future<RecordMetadata> sendFuture =
+ kafkaProducer.send(new ProducerRecord<>(topic,
lines.get(i)));
+ if (wait) {
+ sendFuture.get();
+ }
}
} catch (Exception e) {
// ignore
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 148f647e7..40743fae6 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -719,7 +719,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
// ---------- Write the Canal json into Kafka -------------------
List<String> lines =
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
try {
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic, lines, true);
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
@@ -764,7 +764,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
// ---------- Write the Canal json into Kafka -------------------
List<String> lines =
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
try {
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic, lines, true);
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index f2f8fc246..e13ab86e9 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -278,7 +278,7 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
readLines(
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
try {
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic, lines, true);
} catch (Exception e) {
throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
}