This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2a3607ae2 [cdc][refactor] Refactor records writing in Kafka CDC tests
(#3124)
2a3607ae2 is described below
commit 2a3607ae27f2f96f147cc53da07ce0d3ec122292
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 2 09:15:36 2024 +0800
[cdc][refactor] Refactor records writing in Kafka CDC tests (#3124)
---
.../action/cdc/kafka/KafkaActionITCaseBase.java | 138 +++++++-----
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 138 +++---------
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 211 +++++-------------
.../kafka/KafkaDebeziumSyncTableActionITCase.java | 19 +-
.../flink/action/cdc/kafka/KafkaSchemaITCase.java | 22 +-
.../cdc/kafka/KafkaSyncDatabaseActionITCase.java | 124 ++++-------
.../cdc/kafka/KafkaSyncTableActionITCase.java | 236 ++++-----------------
7 files changed, 256 insertions(+), 632 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 002280079..7d8c78c2e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.utils.StringUtils;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.DockerImageVersions;
@@ -33,11 +33,15 @@ import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -50,10 +54,9 @@ import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
-import java.io.File;
-import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -67,8 +70,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
-import static org.assertj.core.api.Assertions.assertThat;
-
/** Base test class for Kafka synchronization. */
public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
@@ -78,7 +79,11 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
- private static final int zkTimeoutMills = 30000;
+ private static final int ZK_TIMEOUT_MILLIS = 30000;
+
+ protected static KafkaProducer<String, String> kafkaProducer;
+ private static KafkaConsumer<String, String> kafkaConsumer;
+ private static AdminClient adminClient;
// Timer for scheduling logging task if the test hangs
private final Timer loggingTimer = new Timer("Debug Logging Timer");
@@ -104,6 +109,42 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
// test run
.withEnv("KAFKA_LOG_RETENTION_MS", "-1");
+ @BeforeAll
+ public static void beforeAll() {
+ // create KafkaProducer
+ Properties producerProperties = getStandardProps();
+ producerProperties.setProperty("retries", "0");
+ producerProperties.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getCanonicalName());
+ producerProperties.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getCanonicalName());
+ kafkaProducer = new KafkaProducer<>(producerProperties);
+
+ // create KafkaConsumer
+ Properties consumerProperties = getStandardProps();
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"flink-tests-debugging");
+ consumerProperties.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getCanonicalName());
+ consumerProperties.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getCanonicalName());
+ kafkaConsumer = new KafkaConsumer<>(consumerProperties);
+
+ // create AdminClient
+ adminClient = AdminClient.create(getStandardProps());
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ // Close Kafka objects
+ kafkaProducer.close();
+ kafkaConsumer.close();
+ adminClient.close();
+ }
+
@BeforeEach
public void setup() {
// Probe Kafka broker status per 30 seconds
@@ -130,7 +171,6 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
}
private void deleteTopics() throws ExecutionException,
InterruptedException {
- final AdminClient adminClient = AdminClient.create(getStandardProps());
adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
}
@@ -156,13 +196,12 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
}
private Map<String, TopicDescription> describeExternalTopics() {
- try (final AdminClient adminClient =
AdminClient.create(getStandardProps())) {
+ try {
final List<String> topics =
adminClient.listTopics().listings().get().stream()
.filter(listing -> !listing.isInternal())
.map(TopicListing::name)
.collect(Collectors.toList());
-
return adminClient.describeTopics(topics).all().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
@@ -170,15 +209,6 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
}
private void logTopicPartitionStatus(Map<String, TopicDescription>
topicDescriptions) {
- final Properties properties = getStandardProps();
- properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"flink-tests-debugging");
- properties.setProperty(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getCanonicalName());
- properties.setProperty(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getCanonicalName());
- final KafkaConsumer<?, ?> consumer = new KafkaConsumer<String,
String>(properties);
List<TopicPartition> partitions = new ArrayList<>();
topicDescriptions.forEach(
(topic, description) ->
@@ -189,8 +219,9 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
partitions.add(
new TopicPartition(
topic,
tpInfo.partition()))));
- final Map<TopicPartition, Long> beginningOffsets =
consumer.beginningOffsets(partitions);
- final Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(partitions);
+ final Map<TopicPartition, Long> beginningOffsets =
+ kafkaConsumer.beginningOffsets(partitions);
+ final Map<TopicPartition, Long> endOffsets =
kafkaConsumer.endOffsets(partitions);
partitions.forEach(
partition ->
LOG.info(
@@ -207,8 +238,8 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.offset.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
- standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills);
- standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills);
+ standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
+ standardProps.put("zookeeper.connection.timeout.ms",
ZK_TIMEOUT_MILLIS);
standardProps.put("default.api.timeout.ms", "120000");
return standardProps;
}
@@ -253,11 +284,12 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
}
}
- public void createTestTopic(String topic, int numPartitions, int
replicationFactor) {
+ protected void createTestTopic(String topic, int numPartitions, int
replicationFactor) {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers());
- try (AdminClient admin = AdminClient.create(properties)) {
- admin.createTopics(
+ try {
+ adminClient
+ .createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, (short)
replicationFactor)))
.all()
@@ -271,42 +303,40 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
}
}
- public static List<String> readLines(String resource) throws IOException {
- final URL url =
-
KafkaCanalSyncTableActionITCase.class.getClassLoader().getResource(resource);
- assertThat(url).isNotNull();
- java.nio.file.Path path = new File(url.getFile()).toPath();
- return Files.readAllLines(path);
+ protected void writeRecordsToKafka(String topic, String resourceDirFormat,
Object... args)
+ throws Exception {
+ writeRecordsToKafka(topic, false, resourceDirFormat, args);
}
- void writeRecordsToKafka(String topic, List<String> lines) throws
Exception {
- writeRecordsToKafka(topic, lines, false);
+ protected void writeRecordsToKafka(
+ String topic, boolean wait, String resourceDirFormat, Object...
args) throws Exception {
+ URL url =
+ KafkaCanalSyncTableActionITCase.class
+ .getClassLoader()
+ .getResource(String.format(resourceDirFormat, args));
+ Files.readAllLines(Paths.get(url.toURI())).stream()
+ .filter(this::isRecordLine)
+ .forEach(r -> send(topic, r, wait));
}
- void writeRecordsToKafka(String topic, List<String> lines, boolean wait)
throws Exception {
- Properties producerProperties = getStandardProps();
- producerProperties.setProperty("retries", "0");
- producerProperties.put(
- "key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- producerProperties.put(
- "value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
- for (int i = 0; i < lines.size(); i++) {
+ private boolean isRecordLine(String line) {
+ try {
+ objectMapper.readTree(line);
+ return !StringUtils.isEmpty(line);
+ } catch (JsonProcessingException e) {
+ return false;
+ }
+ }
+
+ private void send(String topic, String record, boolean wait) {
+ Future<RecordMetadata> sendFuture = kafkaProducer.send(new
ProducerRecord<>(topic, record));
+ if (wait) {
try {
- JsonNode jsonNode = objectMapper.readTree(lines.get(i));
- if (!StringUtils.isEmpty(lines.get(i))) {
- Future<RecordMetadata> sendFuture =
- kafkaProducer.send(new ProducerRecord<>(topic,
lines.get(i)));
- if (wait) {
- sendFuture.get();
- }
- }
- } catch (Exception e) {
- // ignore
+ sendFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
}
}
-
- kafkaProducer.close();
}
/** Kafka container extension for junit5. */
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 47b3419c8..fa6bd9301 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -58,19 +58,11 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<String> topics = Arrays.asList(topic1, topic2, topic3);
topics.forEach(topic -> createTestTopic(topic, 1, 1));
- // ---------- Write the Canal json into Kafka -------------------
-
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- topics.get(i),
- readLines(
- "kafka/canal/database/schemaevolution/topic"
- + i
- + "/canal-data-1.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ topics.get(i),
+
"kafka/canal/database/schemaevolution/topic%s/canal-data-1.txt",
+ i);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -99,19 +91,11 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<String> topics = Collections.singletonList(topic);
topics.forEach(t -> createTestTopic(t, 1, 1));
- // ---------- Write the Canal json into Kafka -------------------
-
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- topics.get(0),
- readLines(
- "kafka/canal/database/schemaevolution/topic"
- + i
- + "/canal-data-1.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ topics.get(0),
+
"kafka/canal/database/schemaevolution/topic%s/canal-data-1.txt",
+ i);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -155,16 +139,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(expected, table2, rowType2, primaryKeys2);
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- writeOne ? topics.get(0) : topics.get(i),
- readLines(
- "kafka/canal/database/schemaevolution/topic"
- + i
- + "/canal-data-2.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+
"kafka/canal/database/schemaevolution/topic%s/canal-data-2.txt",
+ i);
}
rowType1 =
@@ -200,16 +178,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(expected, table2, rowType2, primaryKeys2);
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- writeOne ? topics.get(0) : topics.get(i),
- readLines(
- "kafka/canal/database/schemaevolution/topic"
- + i
- + "/canal-data-3.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+
"kafka/canal/database/schemaevolution/topic%s/canal-data-3.txt",
+ i);
}
rowType1 =
@@ -281,19 +253,9 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<String> topics = Arrays.asList(topic1, topic2);
topics.forEach(topic -> createTestTopic(topic, 1, 1));
- // ---------- Write the Canal json into Kafka -------------------
-
for (int i = 0; i < topics.size(); i++) {
- try {
- writeRecordsToKafka(
- topics.get(i),
- readLines(
- "kafka/canal/database/prefixsuffix/topic"
- + i
- + "/canal-data-1.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ topics.get(i),
"kafka/canal/database/prefixsuffix/topic%s/canal-data-1.txt", i);
}
// try synchronization
@@ -332,19 +294,9 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
int fileCount = 2;
topics.forEach(topic -> createTestTopic(topic, 1, 1));
- // ---------- Write the Canal json into Kafka -------------------
-
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- topics.get(0),
- readLines(
- "kafka/canal/database/prefixsuffix/topic"
- + i
- + "/canal-data-1.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ topics.get(0),
"kafka/canal/database/prefixsuffix/topic%s/canal-data-1.txt", i);
}
// try synchronization
@@ -388,16 +340,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(expected, table2, rowType2, primaryKeys2);
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- writeOne ? topics.get(0) : topics.get(i),
- readLines(
- "kafka/canal/database/prefixsuffix/topic"
- + i
- + "/canal-data-2.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+
"kafka/canal/database/prefixsuffix/topic%s/canal-data-2.txt",
+ i);
}
rowType1 =
RowType.of(
@@ -428,16 +374,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(expected, table2, rowType2, primaryKeys2);
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- writeOne ? topics.get(0) : topics.get(i),
- readLines(
- "kafka/canal/database/prefixsuffix/topic"
- + i
- + "/canal-data-3.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+
"kafka/canal/database/prefixsuffix/topic%s/canal-data-3.txt",
+ i);
}
rowType1 =
@@ -510,16 +450,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
final String topic1 = "include_exclude" + UUID.randomUUID();
List<String> topics = Collections.singletonList(topic1);
topics.forEach(topic -> createTestTopic(topic, 1, 1));
+ writeRecordsToKafka(topics.get(0),
"kafka/canal/database/include/topic0/canal-data-1.txt");
- // ---------- Write the Canal json into Kafka -------------------
-
- try {
- writeRecordsToKafka(
- topics.get(0),
-
readLines("kafka/canal/database/include/topic0/canal-data-1.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
// try synchronization
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -542,9 +474,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
public void testTypeMappingToString() throws Exception {
final String topic = "map-to-string";
createTestTopic(topic, 1, 1);
-
- // ---------- Write the Canal json into Kafka -------------------
- writeRecordsToKafka(topic,
readLines("kafka/canal/database/tostring/canal-data-1.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/database/tostring/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -590,10 +520,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
public void testCaseInsensitive() throws Exception {
final String topic = "case-insensitive";
createTestTopic(topic, 1, 1);
-
- // ---------- Write the Canal json into Kafka -------------------
- writeRecordsToKafka(
- topic,
readLines("kafka/canal/database/case-insensitive/canal-data-1.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/database/case-insensitive/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -628,8 +555,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
public void testCannotSynchronizeIncompleteJson() throws Exception {
final String topic = "incomplete";
createTestTopic(topic, 1, 1);
-
- writeRecordsToKafka(topic,
readLines("kafka/canal/database/incomplete/canal-data-1.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/database/incomplete/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 9ad54f04d..5bde711c0 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -74,14 +74,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
private void runSingleTableSchemaEvolution(String sourceDir) throws
Exception {
final String topic = "schema_evolution";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
-
readLines(String.format("kafka/canal/table/%s/canal-data-1.txt", sourceDir));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-1.txt",
sourceDir);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -111,13 +105,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2,
two]", "+I[2, 4, four]");
waitForResult(expected, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
-
readLines(String.format("kafka/canal/table/%s/canal-data-2.txt", sourceDir)));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-2.txt",
sourceDir);
+
rowType =
RowType.of(
new DataType[] {
@@ -137,13 +126,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
"+I[1, 6, six, 60]");
waitForResult(expected, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
-
readLines(String.format("kafka/canal/table/%s/canal-data-3.txt", sourceDir)));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-3.txt",
sourceDir);
+
rowType =
RowType.of(
new DataType[] {
@@ -164,13 +148,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
"+I[2, 8, eight, 80000000000]");
waitForResult(expected, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
-
readLines(String.format("kafka/canal/table/%s/canal-data-4.txt", sourceDir)));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-4.txt",
sourceDir);
+
rowType =
RowType.of(
new DataType[] {
@@ -195,13 +174,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105,
110, 101, 46, 98, 105, 110], 9.9]");
waitForResult(expected, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
-
readLines(String.format("kafka/canal/table/%s/canal-data-5.txt", sourceDir)));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-5.txt",
sourceDir);
+
rowType =
RowType.of(
new DataType[] {
@@ -232,14 +206,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testMultipleSchemaEvolutions() throws Exception {
final String topic = "schema_evolution_multiple";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
-
readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -271,12 +239,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
List<String> expected = Collections.singletonList("+I[1, one, 10,
string_1]");
waitForResult(expected, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt");
+
rowType =
RowType.of(
new DataType[] {
@@ -300,7 +264,6 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
@Test
@Timeout(60)
public void testAllTypes() throws Exception {
-
// the first round checks for table creation
// the second round checks for running the action on an existing table
for (int i = 0; i < 2; i++) {
@@ -312,14 +275,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
private void testAllTypesOnce() throws Exception {
final String topic = "all_type" + UUID.randomUUID();
createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, "kafka/canal/table/alltype/canal-data.txt");
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/alltype/canal-data.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -578,13 +535,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testNotSupportFormat() throws Exception {
final String topic = "not_support";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/schemaevolution/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "togg-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -606,13 +558,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testKafkaNoNonDdlData() throws Exception {
final String topic = "no_non_ddl_data";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/nononddldata/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/nononddldata/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -634,13 +581,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testAssertSchemaCompatible() throws Exception {
final String topic = "assert_schema_compatible";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/schemaevolution/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -675,13 +617,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testStarUpOptionSpecific() throws Exception {
final String topic = "start_up_specific";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/startupmode/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -716,13 +653,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testStarUpOptionLatest() throws Exception {
final String topic = "start_up_latest";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines, true);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic, true,
"kafka/canal/table/startupmode/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -735,11 +667,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
.build();
runActionWithDefaultEnv(action);
- try {
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/startupmode/canal-data-2.txt");
+
FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
@@ -761,13 +690,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testStarUpOptionTimestamp() throws Exception {
final String topic = "start_up_timestamp";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines, true);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic, true,
"kafka/canal/table/startupmode/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -782,11 +706,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
.build();
runActionWithDefaultEnv(action);
- try {
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/startupmode/canal-data-2.txt");
+
FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
@@ -808,13 +729,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testStarUpOptionEarliest() throws Exception {
final String topic = "start_up_earliest";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/startupmode/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -827,11 +743,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
.build();
runActionWithDefaultEnv(action);
- try {
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/startupmode/canal-data-2.txt");
+
FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
@@ -855,13 +768,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testStarUpOptionGroup() throws Exception {
final String topic = "start_up_group";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/startupmode/canal-data-1.txt");
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -874,11 +782,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
.build();
runActionWithDefaultEnv(action);
- try {
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/startupmode/canal-data-2.txt");
+
FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
@@ -902,13 +807,8 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testComputedColumn() throws Exception {
String topic = "computed_column";
createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic,
"kafka/canal/table/computedcolumn/canal-data-1.txt");
- List<String> lines =
readLines("kafka/canal/table/computedcolumn/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -939,9 +839,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testTypeMappingToString() throws Exception {
final String topic = "map-to-string";
createTestTopic(topic, 1, 1);
-
- // ---------- Write the Canal json into Kafka -------------------
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/tostring/canal-data-1.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/table/tostring/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -986,9 +884,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testCDCOperations(boolean ignoreDelete) throws Exception {
final String topic = "event-insert" + UUID.randomUUID();
createTestTopic(topic, 1, 1);
-
- // ---------- Write the Canal json into Kafka -------------------
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/event/event-row.txt"));
+ writeRecordsToKafka(topic, "kafka/canal/table/event/event-row.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1022,7 +918,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105,
110, 101, 46, 98, 105, 110], 9.9]");
waitForResult(expectedRow, table, rowType, primaryKeys);
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/event/event-insert.txt"));
+ writeRecordsToKafka(topic, "kafka/canal/table/event/event-insert.txt");
// For the INSERT operation
List<String> expectedInsert =
@@ -1033,7 +929,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
"+I[2, 4, four, NULL, NULL, NULL, NULL]");
waitForResult(expectedInsert, table, rowType, primaryKeys);
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/event/event-update.txt"));
+ writeRecordsToKafka(topic, "kafka/canal/table/event/event-update.txt");
// For the UPDATE operation
List<String> expectedUpdate =
@@ -1044,7 +940,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
"+I[2, 4, four, NULL, NULL, NULL, NULL]");
waitForResult(expectedUpdate, table, rowType, primaryKeys);
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/event/event-delete.txt"));
+ writeRecordsToKafka(topic, "kafka/canal/table/event/event-delete.txt");
// For the DELETE operation
List<String> expectedDelete =
@@ -1082,8 +978,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
.build();
runActionWithDefaultEnv(action);
- List<String> lines =
readLines("kafka/canal/table/initialemptytopic/canal-data-1.txt");
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic,
"kafka/canal/table/initialemptytopic/canal-data-1.txt");
RowType rowType =
RowType.of(
@@ -1106,8 +1001,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testSynchronizeIncompleteJson() throws Exception {
String topic = "incomplete";
createTestTopic(topic, 1, 1);
- List<String> lines =
readLines("kafka/canal/table/incomplete/canal-data-1.txt");
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic,
"kafka/canal/table/incomplete/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1138,8 +1032,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testSynchronizeNonPkTable() throws Exception {
String topic = "non_pk";
createTestTopic(topic, 1, 1);
- List<String> lines =
readLines("kafka/canal/table/nonpk/canal-data-1.txt");
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic, "kafka/canal/table/nonpk/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1167,7 +1060,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
public void testMissingDecimalPrecision() throws Exception {
String topic = "missing-decimal-precision";
createTestTopic(topic, 1, 1);
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/incomplete/canal-data-2.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/table/incomplete/canal-data-2.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1209,8 +1102,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
Collections.singletonList("_id"),
Collections.emptyMap());
} else {
- List<String> lines =
readLines("kafka/canal/table/computedcolumn/canal-data-2.txt");
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic,
"kafka/canal/table/computedcolumn/canal-data-2.txt");
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -1227,8 +1119,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
runActionWithDefaultEnv(action);
if (triggerSchemaRetrievalException) {
- List<String> lines =
readLines("kafka/canal/table/computedcolumn/canal-data-2.txt");
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic,
"kafka/canal/table/computedcolumn/canal-data-2.txt");
}
RowType rowType =
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index ba9633162..04dfb3769 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -22,16 +22,13 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import java.util.Properties;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
@@ -125,22 +122,10 @@ public class KafkaDebeziumSyncTableActionITCase extends
KafkaSyncTableActionITCa
final String topic = "test_null_value";
createTestTopic(topic, 1, 1);
- List<String> lines =
readLines("kafka/debezium/table/nullvalue/debezium-data-1.txt");
- writeRecordsToKafka(topic, lines);
-
+ writeRecordsToKafka(topic,
"kafka/debezium/table/nullvalue/debezium-data-1.txt");
// write null value
- Properties producerProperties = getStandardProps();
- producerProperties.setProperty("retries", "0");
- producerProperties.put(
- "key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- producerProperties.put(
- "value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(producerProperties);
kafkaProducer.send(new ProducerRecord<>(topic, null));
- kafkaProducer.close();
-
- lines =
readLines("kafka/debezium/table/nullvalue/debezium-data-2.txt");
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic,
"kafka/debezium/table/nullvalue/debezium-data-2.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
index 9d544c632..cb33f6259 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
@@ -48,13 +48,8 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase
{
public void testKafkaSchema() throws Exception {
final String topic = "test_kafka_schema";
createTestTopic(topic, 1, 1);
- // ---------- Write the Canal json into Kafka -------------------
- List<String> lines =
readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write canal data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/canal/table/schemaevolution/canal-data-1.txt");
+
Configuration kafkaConfig =
Configuration.fromMap(getBasicKafkaConfig());
kafkaConfig.setString(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.setString(TOPIC.key(), topic);
@@ -76,9 +71,7 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase {
public void testTableOptionsChange() throws Exception {
final String topic = "test_table_options_change";
createTestTopic(topic, 1, 1);
-
- // ---------- Write the Canal json into Kafka -------------------
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/optionschange/canal-data-1.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/table/optionschange/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -94,7 +87,7 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase {
waitingTables(tableName);
jobClient.cancel();
- writeRecordsToKafka(topic,
readLines("kafka/canal/table/optionschange/canal-data-2.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/table/optionschange/canal-data-2.txt");
tableConfig.put("sink.savepoint.auto-tag", "true");
tableConfig.put("tag.num-retained-max", "5");
@@ -121,10 +114,8 @@ public class KafkaSchemaITCase extends
KafkaActionITCaseBase {
public void testNewlyAddedTablesOptionsChange() throws Exception {
final String topic = "test_database_options_change";
createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic,
"kafka/canal/database/schemaevolution/topic0/canal-data-1.txt");
- // ---------- Write the Canal json into Kafka -------------------
- writeRecordsToKafka(
- topic,
readLines("kafka/canal/database/schemaevolution/topic0/canal-data-1.txt"));
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -148,8 +139,7 @@ public class KafkaSchemaITCase extends
KafkaActionITCaseBase {
tableConfig.put("snapshot.num-retained.max", "10");
tableConfig.put("changelog-producer", "input");
- writeRecordsToKafka(
- topic,
readLines("kafka/canal/database/schemaevolution/topic1/canal-data-1.txt"));
+ writeRecordsToKafka(topic,
"kafka/canal/database/schemaevolution/topic1/canal-data-1.txt");
KafkaSyncDatabaseAction action2 =
syncDatabaseActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
runActionWithDefaultEnv(action2);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
index cdad175ea..16aff6984 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
@@ -49,19 +49,13 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<String> topics = Arrays.asList(topic1, topic2);
topics.forEach(topic -> createTestTopic(topic, 1, 1));
- // ---------- Write the data into Kafka -------------------
-
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- topics.get(i),
- readLines(
- String.format(
-
"kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
- format, i, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ topics.get(i),
+ "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
+ format,
+ i,
+ format);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -83,19 +77,13 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<String> topics = Collections.singletonList(topic);
topics.forEach(t -> createTestTopic(t, 1, 1));
- // ---------- Write the maxwell json into Kafka -------------------
-
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- topics.get(0),
- readLines(
- String.format(
-
"kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
- format, i, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ topics.get(0),
+ "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
+ format,
+ i,
+ format);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -150,16 +138,12 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(expected2, table2, rowType2, getPrimaryKey(format));
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- writeOne ? topics.get(0) : topics.get(i),
- readLines(
- String.format(
-
"kafka/%s/database/schemaevolution/topic%s/%s-data-2.txt",
- format, i, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+ "kafka/%s/database/schemaevolution/topic%s/%s-data-2.txt",
+ format,
+ i,
+ format);
}
rowType1 =
@@ -244,19 +228,13 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<String> topics = Arrays.asList(topic1, topic2);
topics.forEach(topic -> createTestTopic(topic, 1, 1));
- // ---------- Write the data into Kafka -------------------
-
for (int i = 0; i < topics.size(); i++) {
- try {
- writeRecordsToKafka(
- topics.get(i),
- readLines(
- String.format(
-
"kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
- format, i, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ topics.get(i),
+ "kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
+ format,
+ i,
+ format);
}
// try synchronization
@@ -298,19 +276,12 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
int fileCount = 2;
topics.forEach(topic -> createTestTopic(topic, 1, 1));
- // ---------- Write the maxwell json into Kafka -------------------
-
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- topics.get(0),
- readLines(
- String.format(
-
"kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
- format, i, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ topics.get(0),
+ String.format(
+
"kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
+ format, i, format));
}
// try synchronization
@@ -370,16 +341,12 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(expected, table2, rowType2, getPrimaryKey(format));
for (int i = 0; i < fileCount; i++) {
- try {
- writeRecordsToKafka(
- writeOne ? topics.get(0) : topics.get(i),
- readLines(
- String.format(
-
"kafka/%s/database/prefixsuffix/topic%s/%s-data-2.txt",
- format, i, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+ "kafka/%s/database/prefixsuffix/topic%s/%s-data-2.txt",
+ format,
+ i,
+ format);
}
rowType1 =
RowType.of(
@@ -470,18 +437,9 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<String> topics = Collections.singletonList(topic1);
topics.forEach(topic -> createTestTopic(topic, 1, 1));
- // ---------- Write the data into Kafka -------------------
+ writeRecordsToKafka(
+ topics.get(0),
"kafka/%s/database/include/topic0/%s-data-1.txt", format, format);
- try {
- writeRecordsToKafka(
- topics.get(0),
- readLines(
- String.format(
-
"kafka/%s/database/include/topic0/%s-data-1.txt",
- format, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
// try synchronization
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -503,14 +461,8 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
final String topic = "case-insensitive";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
-
writeRecordsToKafka(
- topic,
- readLines(
- String.format(
-
"kafka/%s/database/case-insensitive/%s-data-1.txt",
- format, format)));
+ topic, "kafka/%s/database/case-insensitive/%s-data-1.txt",
format, format);
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index e13ab86e9..5785f680a 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -58,16 +58,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
protected void runSingleTableSchemaEvolution(String sourceDir, String
format) throws Exception {
final String topic = "schema_evolution";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
- String.format(
- "kafka/%s/table/%s/%s-data-1.txt", format,
sourceDir, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-1.txt", format,
sourceDir, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -101,15 +93,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
"+I[102, car battery, 12V car battery, 8.1]");
waitForResult(expected, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
- readLines(
- String.format(
- "kafka/%s/table/%s/%s-data-2.txt", format,
sourceDir, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-2.txt", format,
sourceDir, format);
+
rowType =
RowType.of(
new DataType[] {
@@ -128,15 +113,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
"+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
waitForResult(expected, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
- readLines(
- String.format(
- "kafka/%s/table/%s/%s-data-3.txt", format,
sourceDir, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-3.txt", format,
sourceDir, format);
+
rowType =
RowType.of(
new DataType[] {
@@ -161,16 +139,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testNotSupportFormat(String format) throws Exception {
final String topic = "not_support";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
- String.format(
-
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic,
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "togg-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -190,16 +160,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
protected void testAssertSchemaCompatible(String format) throws Exception {
final String topic = "assert_schema_compatible";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
- String.format(
-
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic,
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -231,15 +193,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
protected void testStarUpOptionSpecific(String format) throws Exception {
final String topic = "start_up_specific";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
-
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt",
format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -273,15 +228,9 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
protected void testStarUpOptionLatest(String format) throws Exception {
final String topic = "start_up_latest";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
-
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines, true);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ topic, true, "kafka/%s/table/startupmode/%s-data-1.txt",
format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -295,15 +244,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
Thread.sleep(5000);
FileStoreTable table = getFileStoreTable(tableName);
- try {
- writeRecordsToKafka(
- topic,
- readLines(
- String.format(
-
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+
+ writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt",
format, format);
RowType rowType =
RowType.of(
@@ -326,15 +268,9 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testStarUpOptionTimestamp(String format) throws Exception {
final String topic = "start_up_timestamp";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
-
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ topic, true, "kafka/%s/table/startupmode/%s-data-1.txt",
format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -348,15 +284,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
.build();
runActionWithDefaultEnv(action);
- try {
- writeRecordsToKafka(
- topic,
- readLines(
- String.format(
-
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt",
format, format);
+
FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
@@ -380,15 +309,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testStarUpOptionEarliest(String format) throws Exception {
final String topic = "start_up_earliest";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
-
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt",
format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -400,15 +322,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
.build();
runActionWithDefaultEnv(action);
- try {
- writeRecordsToKafka(
- topic,
- readLines(
- String.format(
-
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt",
format, format);
+
FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
@@ -434,15 +349,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testStarUpOptionGroup(String format) throws Exception {
final String topic = "start_up_group";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
-
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt",
format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -454,15 +362,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
.build();
runActionWithDefaultEnv(action);
- try {
- writeRecordsToKafka(
- topic,
- readLines(
- String.format(
-
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt",
format, format);
+
FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
@@ -488,16 +389,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testComputedColumn(String format) throws Exception {
String topic = "computed_column";
createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic,
"kafka/%s/table/computedcolumn/%s-data-1.txt", format, format);
- List<String> lines =
- readLines(
- String.format(
- "kafka/%s/table/computedcolumn/%s-data-1.txt",
format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -528,14 +421,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
protected void testCDCOperations(String format) throws Exception {
String topic = "event";
createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, "kafka/%s/table/event/event-insert.txt",
format);
- List<String> lines =
-
readLines(String.format("kafka/%s/table/event/event-insert.txt", format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
kafkaConfig.put(TOPIC.key(), topic);
@@ -566,13 +453,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
"+I[103, scooter, Big 2-wheel scooter , 5.1]");
waitForResult(expectedInsert, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
-
readLines(String.format("kafka/%s/table/event/event-update.txt", format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/event/event-update.txt",
format);
+
// For the UPDATE operation
List<String> expectedUpdate =
Arrays.asList(
@@ -581,13 +463,7 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
"+I[103, scooter, Big 2-wheel scooter , 8.1]");
waitForResult(expectedUpdate, table, rowType, primaryKeys);
- try {
- writeRecordsToKafka(
- topic,
-
readLines(String.format("kafka/%s/table/event/event-delete.txt", format)));
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic, "kafka/%s/table/event/event-delete.txt",
format);
// For the REPLACE operation
List<String> expectedReplace =
@@ -600,17 +476,9 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testKafkaBuildSchemaWithDelete(String format) throws Exception
{
final String topic = "test_kafka_schema";
createTestTopic(topic, 1, 1);
- // ---------- Write the Debezium json into Kafka -------------------
- List<String> lines =
- readLines(
- String.format(
-
"kafka/%s/table/schema/schemaevolution/%s-data-4.txt",
- format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(
+ topic, "kafka/%s/table/schema/schemaevolution/%s-data-4.txt",
format, format);
+
Configuration kafkaConfig =
Configuration.fromMap(getBasicKafkaConfig());
kafkaConfig.setString(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.setString(TOPIC.key(), topic);
@@ -632,9 +500,7 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testWaterMarkSyncTable(String format) throws Exception {
String topic = "watermark";
createTestTopic(topic, 1, 1);
- writeRecordsToKafka(
- topic,
-
readLines(String.format("kafka/%s/table/watermark/%s-data-1.txt", format,
format)));
+ writeRecordsToKafka(topic, "kafka/%s/table/watermark/%s-data-1.txt",
format, format);
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -666,13 +532,7 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testSchemaIncludeRecord(String format) throws Exception {
String topic = "schema_include";
createTestTopic(topic, 1, 1);
-
- List<String> lines =
readLines("kafka/debezium/table/schema/include/debezium-data-1.txt");
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception("Failed to write debezium data to Kafka.", e);
- }
+ writeRecordsToKafka(topic,
"kafka/debezium/table/schema/include/debezium-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -706,9 +566,7 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
public void testAllTypesWithSchemaImpl(String format) throws Exception {
String topic = "schema_include_all_type";
createTestTopic(topic, 1, 1);
-
- List<String> lines =
readLines("kafka/debezium/table/schema/alltype/debezium-data-1.txt");
- writeRecordsToKafka(topic, lines);
+ writeRecordsToKafka(topic,
"kafka/debezium/table/schema/alltype/debezium-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -952,16 +810,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
protected void testTableFiledValNull(String format) throws Exception {
final String topic = "table_filed_val_null";
createTestTopic(topic, 1, 1);
- // ---------- Write the data into Kafka -------------------
- List<String> lines =
- readLines(
- String.format(
-
"kafka/%s/table/schemaevolution/%s-data-4.txt", format, format));
- try {
- writeRecordsToKafka(topic, lines);
- } catch (Exception e) {
- throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
- }
+ writeRecordsToKafka(topic,
"kafka/%s/table/schemaevolution/%s-data-4.txt", format, format);
+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);