This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a3607ae2 [cdc][refactor] Refactor records writing in Kafka CDC tests 
(#3124)
2a3607ae2 is described below

commit 2a3607ae27f2f96f147cc53da07ce0d3ec122292
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 2 09:15:36 2024 +0800

    [cdc][refactor] Refactor records writing in Kafka CDC tests (#3124)
---
 .../action/cdc/kafka/KafkaActionITCaseBase.java    | 138 +++++++-----
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  | 138 +++---------
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 211 +++++-------------
 .../kafka/KafkaDebeziumSyncTableActionITCase.java  |  19 +-
 .../flink/action/cdc/kafka/KafkaSchemaITCase.java  |  22 +-
 .../cdc/kafka/KafkaSyncDatabaseActionITCase.java   | 124 ++++-------
 .../cdc/kafka/KafkaSyncTableActionITCase.java      | 236 ++++-----------------
 7 files changed, 256 insertions(+), 632 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 002280079..7d8c78c2e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.action.cdc.kafka;
 import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
 import org.apache.paimon.utils.StringUtils;
 
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.flink.util.DockerImageVersions;
@@ -33,11 +33,15 @@ import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -50,10 +54,9 @@ import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
-import java.io.File;
-import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -67,8 +70,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-import static org.assertj.core.api.Assertions.assertThat;
-
 /** Base test class for Kafka synchronization. */
 public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
 
@@ -78,7 +79,11 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
 
     private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
     private static final Network NETWORK = Network.newNetwork();
-    private static final int zkTimeoutMills = 30000;
+    private static final int ZK_TIMEOUT_MILLIS = 30000;
+
+    protected static KafkaProducer<String, String> kafkaProducer;
+    private static KafkaConsumer<String, String> kafkaConsumer;
+    private static AdminClient adminClient;
 
     // Timer for scheduling logging task if the test hangs
     private final Timer loggingTimer = new Timer("Debug Logging Timer");
@@ -104,6 +109,42 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
                             // test run
                             .withEnv("KAFKA_LOG_RETENTION_MS", "-1");
 
+    @BeforeAll
+    public static void beforeAll() {
+        // create KafkaProducer
+        Properties producerProperties = getStandardProps();
+        producerProperties.setProperty("retries", "0");
+        producerProperties.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                StringSerializer.class.getCanonicalName());
+        producerProperties.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                StringSerializer.class.getCanonicalName());
+        kafkaProducer = new KafkaProducer<>(producerProperties);
+
+        // create KafkaConsumer
+        Properties consumerProperties = getStandardProps();
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-tests-debugging");
+        consumerProperties.setProperty(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        consumerProperties.setProperty(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        kafkaConsumer = new KafkaConsumer<>(consumerProperties);
+
+        // create AdminClient
+        adminClient = AdminClient.create(getStandardProps());
+    }
+
+    @AfterAll
+    public static void afterAll() {
+        // Close Kafka objects
+        kafkaProducer.close();
+        kafkaConsumer.close();
+        adminClient.close();
+    }
+
     @BeforeEach
     public void setup() {
         // Probe Kafka broker status per 30 seconds
@@ -130,7 +171,6 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
     }
 
     private void deleteTopics() throws ExecutionException, 
InterruptedException {
-        final AdminClient adminClient = AdminClient.create(getStandardProps());
         
adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
     }
 
@@ -156,13 +196,12 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
     }
 
     private Map<String, TopicDescription> describeExternalTopics() {
-        try (final AdminClient adminClient = 
AdminClient.create(getStandardProps())) {
+        try {
             final List<String> topics =
                     adminClient.listTopics().listings().get().stream()
                             .filter(listing -> !listing.isInternal())
                             .map(TopicListing::name)
                             .collect(Collectors.toList());
-
             return adminClient.describeTopics(topics).all().get();
         } catch (Exception e) {
             throw new RuntimeException("Failed to list Kafka topics", e);
@@ -170,15 +209,6 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
     }
 
     private void logTopicPartitionStatus(Map<String, TopicDescription> 
topicDescriptions) {
-        final Properties properties = getStandardProps();
-        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-tests-debugging");
-        properties.setProperty(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                StringDeserializer.class.getCanonicalName());
-        properties.setProperty(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                StringDeserializer.class.getCanonicalName());
-        final KafkaConsumer<?, ?> consumer = new KafkaConsumer<String, 
String>(properties);
         List<TopicPartition> partitions = new ArrayList<>();
         topicDescriptions.forEach(
                 (topic, description) ->
@@ -189,8 +219,9 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
                                                 partitions.add(
                                                         new TopicPartition(
                                                                 topic, 
tpInfo.partition()))));
-        final Map<TopicPartition, Long> beginningOffsets = 
consumer.beginningOffsets(partitions);
-        final Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(partitions);
+        final Map<TopicPartition, Long> beginningOffsets =
+                kafkaConsumer.beginningOffsets(partitions);
+        final Map<TopicPartition, Long> endOffsets = 
kafkaConsumer.endOffsets(partitions);
         partitions.forEach(
                 partition ->
                         LOG.info(
@@ -207,8 +238,8 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
         standardProps.put("enable.auto.commit", false);
         standardProps.put("auto.offset.reset", "earliest");
         standardProps.put("max.partition.fetch.bytes", 256);
-        standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills);
-        standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills);
+        standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
+        standardProps.put("zookeeper.connection.timeout.ms", 
ZK_TIMEOUT_MILLIS);
         standardProps.put("default.api.timeout.ms", "120000");
         return standardProps;
     }
@@ -253,11 +284,12 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
         }
     }
 
-    public void createTestTopic(String topic, int numPartitions, int 
replicationFactor) {
+    protected void createTestTopic(String topic, int numPartitions, int 
replicationFactor) {
         Map<String, Object> properties = new HashMap<>();
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
getBootstrapServers());
-        try (AdminClient admin = AdminClient.create(properties)) {
-            admin.createTopics(
+        try {
+            adminClient
+                    .createTopics(
                             Collections.singletonList(
                                     new NewTopic(topic, numPartitions, (short) 
replicationFactor)))
                     .all()
@@ -271,42 +303,40 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
         }
     }
 
-    public static List<String> readLines(String resource) throws IOException {
-        final URL url =
-                
KafkaCanalSyncTableActionITCase.class.getClassLoader().getResource(resource);
-        assertThat(url).isNotNull();
-        java.nio.file.Path path = new File(url.getFile()).toPath();
-        return Files.readAllLines(path);
+    protected void writeRecordsToKafka(String topic, String resourceDirFormat, 
Object... args)
+            throws Exception {
+        writeRecordsToKafka(topic, false, resourceDirFormat, args);
     }
 
-    void writeRecordsToKafka(String topic, List<String> lines) throws 
Exception {
-        writeRecordsToKafka(topic, lines, false);
+    protected void writeRecordsToKafka(
+            String topic, boolean wait, String resourceDirFormat, Object... 
args) throws Exception {
+        URL url =
+                KafkaCanalSyncTableActionITCase.class
+                        .getClassLoader()
+                        .getResource(String.format(resourceDirFormat, args));
+        Files.readAllLines(Paths.get(url.toURI())).stream()
+                .filter(this::isRecordLine)
+                .forEach(r -> send(topic, r, wait));
     }
 
-    void writeRecordsToKafka(String topic, List<String> lines, boolean wait) 
throws Exception {
-        Properties producerProperties = getStandardProps();
-        producerProperties.setProperty("retries", "0");
-        producerProperties.put(
-                "key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        producerProperties.put(
-                "value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
-        for (int i = 0; i < lines.size(); i++) {
+    private boolean isRecordLine(String line) {
+        try {
+            objectMapper.readTree(line);
+            return !StringUtils.isEmpty(line);
+        } catch (JsonProcessingException e) {
+            return false;
+        }
+    }
+
+    private void send(String topic, String record, boolean wait) {
+        Future<RecordMetadata> sendFuture = kafkaProducer.send(new 
ProducerRecord<>(topic, record));
+        if (wait) {
             try {
-                JsonNode jsonNode = objectMapper.readTree(lines.get(i));
-                if (!StringUtils.isEmpty(lines.get(i))) {
-                    Future<RecordMetadata> sendFuture =
-                            kafkaProducer.send(new ProducerRecord<>(topic, 
lines.get(i)));
-                    if (wait) {
-                        sendFuture.get();
-                    }
-                }
-            } catch (Exception e) {
-                // ignore
+                sendFuture.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
             }
         }
-
-        kafkaProducer.close();
     }
 
     /** Kafka container extension for junit5. */
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 47b3419c8..fa6bd9301 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -58,19 +58,11 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<String> topics = Arrays.asList(topic1, topic2, topic3);
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
 
-        // ---------- Write the Canal json into Kafka -------------------
-
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(i),
-                        readLines(
-                                "kafka/canal/database/schemaevolution/topic"
-                                        + i
-                                        + "/canal-data-1.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    topics.get(i),
+                    
"kafka/canal/database/schemaevolution/topic%s/canal-data-1.txt",
+                    i);
         }
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -99,19 +91,11 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<String> topics = Collections.singletonList(topic);
         topics.forEach(t -> createTestTopic(t, 1, 1));
 
-        // ---------- Write the Canal json into Kafka -------------------
-
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(0),
-                        readLines(
-                                "kafka/canal/database/schemaevolution/topic"
-                                        + i
-                                        + "/canal-data-1.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    topics.get(0),
+                    
"kafka/canal/database/schemaevolution/topic%s/canal-data-1.txt",
+                    i);
         }
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -155,16 +139,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(expected, table2, rowType2, primaryKeys2);
 
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        writeOne ? topics.get(0) : topics.get(i),
-                        readLines(
-                                "kafka/canal/database/schemaevolution/topic"
-                                        + i
-                                        + "/canal-data-2.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    writeOne ? topics.get(0) : topics.get(i),
+                    
"kafka/canal/database/schemaevolution/topic%s/canal-data-2.txt",
+                    i);
         }
 
         rowType1 =
@@ -200,16 +178,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(expected, table2, rowType2, primaryKeys2);
 
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        writeOne ? topics.get(0) : topics.get(i),
-                        readLines(
-                                "kafka/canal/database/schemaevolution/topic"
-                                        + i
-                                        + "/canal-data-3.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    writeOne ? topics.get(0) : topics.get(i),
+                    
"kafka/canal/database/schemaevolution/topic%s/canal-data-3.txt",
+                    i);
         }
 
         rowType1 =
@@ -281,19 +253,9 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<String> topics = Arrays.asList(topic1, topic2);
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
 
-        // ---------- Write the Canal json into Kafka -------------------
-
         for (int i = 0; i < topics.size(); i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(i),
-                        readLines(
-                                "kafka/canal/database/prefixsuffix/topic"
-                                        + i
-                                        + "/canal-data-1.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    topics.get(i), 
"kafka/canal/database/prefixsuffix/topic%s/canal-data-1.txt", i);
         }
 
         // try synchronization
@@ -332,19 +294,9 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         int fileCount = 2;
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
 
-        // ---------- Write the Canal json into Kafka -------------------
-
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(0),
-                        readLines(
-                                "kafka/canal/database/prefixsuffix/topic"
-                                        + i
-                                        + "/canal-data-1.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    topics.get(0), 
"kafka/canal/database/prefixsuffix/topic%s/canal-data-1.txt", i);
         }
 
         // try synchronization
@@ -388,16 +340,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(expected, table2, rowType2, primaryKeys2);
 
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        writeOne ? topics.get(0) : topics.get(i),
-                        readLines(
-                                "kafka/canal/database/prefixsuffix/topic"
-                                        + i
-                                        + "/canal-data-2.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    writeOne ? topics.get(0) : topics.get(i),
+                    
"kafka/canal/database/prefixsuffix/topic%s/canal-data-2.txt",
+                    i);
         }
         rowType1 =
                 RowType.of(
@@ -428,16 +374,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(expected, table2, rowType2, primaryKeys2);
 
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        writeOne ? topics.get(0) : topics.get(i),
-                        readLines(
-                                "kafka/canal/database/prefixsuffix/topic"
-                                        + i
-                                        + "/canal-data-3.txt"));
-            } catch (Exception e) {
-                throw new Exception("Failed to write canal data to Kafka.", e);
-            }
+            writeRecordsToKafka(
+                    writeOne ? topics.get(0) : topics.get(i),
+                    
"kafka/canal/database/prefixsuffix/topic%s/canal-data-3.txt",
+                    i);
         }
 
         rowType1 =
@@ -510,16 +450,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         final String topic1 = "include_exclude" + UUID.randomUUID();
         List<String> topics = Collections.singletonList(topic1);
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
+        writeRecordsToKafka(topics.get(0), 
"kafka/canal/database/include/topic0/canal-data-1.txt");
 
-        // ---------- Write the Canal json into Kafka -------------------
-
-        try {
-            writeRecordsToKafka(
-                    topics.get(0),
-                    
readLines("kafka/canal/database/include/topic0/canal-data-1.txt"));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
         // try synchronization
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -542,9 +474,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
     public void testTypeMappingToString() throws Exception {
         final String topic = "map-to-string";
         createTestTopic(topic, 1, 1);
-
-        // ---------- Write the Canal json into Kafka -------------------
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/database/tostring/canal-data-1.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/database/tostring/canal-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -590,10 +520,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
     public void testCaseInsensitive() throws Exception {
         final String topic = "case-insensitive";
         createTestTopic(topic, 1, 1);
-
-        // ---------- Write the Canal json into Kafka -------------------
-        writeRecordsToKafka(
-                topic, 
readLines("kafka/canal/database/case-insensitive/canal-data-1.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/database/case-insensitive/canal-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -628,8 +555,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
     public void testCannotSynchronizeIncompleteJson() throws Exception {
         final String topic = "incomplete";
         createTestTopic(topic, 1, 1);
-
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/database/incomplete/canal-data-1.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/database/incomplete/canal-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 9ad54f04d..5bde711c0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -74,14 +74,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     private void runSingleTableSchemaEvolution(String sourceDir) throws 
Exception {
         final String topic = "schema_evolution";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines =
-                
readLines(String.format("kafka/canal/table/%s/canal-data-1.txt", sourceDir));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-1.txt", 
sourceDir);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -111,13 +105,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
         List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, 
two]", "+I[2, 4, four]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    
readLines(String.format("kafka/canal/table/%s/canal-data-2.txt", sourceDir)));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-2.txt", 
sourceDir);
+
         rowType =
                 RowType.of(
                         new DataType[] {
@@ -137,13 +126,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         "+I[1, 6, six, 60]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    
readLines(String.format("kafka/canal/table/%s/canal-data-3.txt", sourceDir)));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-3.txt", 
sourceDir);
+
         rowType =
                 RowType.of(
                         new DataType[] {
@@ -164,13 +148,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         "+I[2, 8, eight, 80000000000]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    
readLines(String.format("kafka/canal/table/%s/canal-data-4.txt", sourceDir)));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-4.txt", 
sourceDir);
+
         rowType =
                 RowType.of(
                         new DataType[] {
@@ -195,13 +174,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 
110, 101, 46, 98, 105, 110], 9.9]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    
readLines(String.format("kafka/canal/table/%s/canal-data-5.txt", sourceDir)));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-5.txt", 
sourceDir);
+
         rowType =
                 RowType.of(
                         new DataType[] {
@@ -232,14 +206,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testMultipleSchemaEvolutions() throws Exception {
         final String topic = "schema_evolution_multiple";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines =
-                
readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
 
@@ -271,12 +239,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
         List<String> expected = Collections.singletonList("+I[1, one, 10, 
string_1]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic, 
readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt"));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt");
+
         rowType =
                 RowType.of(
                         new DataType[] {
@@ -300,7 +264,6 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     @Test
     @Timeout(60)
     public void testAllTypes() throws Exception {
-
         // the first round checks for table creation
         // the second round checks for running the action on an existing table
         for (int i = 0; i < 2; i++) {
@@ -312,14 +275,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     private void testAllTypesOnce() throws Exception {
         final String topic = "all_type" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, "kafka/canal/table/alltype/canal-data.txt");
 
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/alltype/canal-data.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
 
@@ -578,13 +535,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testNotSupportFormat() throws Exception {
         final String topic = "not_support";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/schemaevolution/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "togg-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -606,13 +558,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testKafkaNoNonDdlData() throws Exception {
         final String topic = "no_non_ddl_data";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/nononddldata/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/nononddldata/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -634,13 +581,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testAssertSchemaCompatible() throws Exception {
         final String topic = "assert_schema_compatible";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/schemaevolution/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -675,13 +617,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testStarUpOptionSpecific() throws Exception {
         final String topic = "start_up_specific";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/startupmode/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -716,13 +653,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testStarUpOptionLatest() throws Exception {
         final String topic = "start_up_latest";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines, true);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, true, 
"kafka/canal/table/startupmode/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -735,11 +667,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         .build();
         runActionWithDefaultEnv(action);
 
-        try {
-            writeRecordsToKafka(topic, 
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/startupmode/canal-data-2.txt");
+
         FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
@@ -761,13 +690,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testStarUpOptionTimestamp() throws Exception {
         final String topic = "start_up_timestamp";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines, true);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, true, 
"kafka/canal/table/startupmode/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -782,11 +706,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         .build();
         runActionWithDefaultEnv(action);
 
-        try {
-            writeRecordsToKafka(topic, 
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/startupmode/canal-data-2.txt");
+
         FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
@@ -808,13 +729,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testStarUpOptionEarliest() throws Exception {
         final String topic = "start_up_earliest";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/startupmode/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -827,11 +743,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         .build();
         runActionWithDefaultEnv(action);
 
-        try {
-            writeRecordsToKafka(topic, 
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/startupmode/canal-data-2.txt");
+
         FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
@@ -855,13 +768,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testStarUpOptionGroup() throws Exception {
         final String topic = "start_up_group";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/startupmode/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/startupmode/canal-data-1.txt");
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -874,11 +782,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         .build();
         runActionWithDefaultEnv(action);
 
-        try {
-            writeRecordsToKafka(topic, 
readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/startupmode/canal-data-2.txt");
+
         FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
@@ -902,13 +807,8 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testComputedColumn() throws Exception {
         String topic = "computed_column";
         createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, 
"kafka/canal/table/computedcolumn/canal-data-1.txt");
 
-        List<String> lines = 
readLines("kafka/canal/table/computedcolumn/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -939,9 +839,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testTypeMappingToString() throws Exception {
         final String topic = "map-to-string";
         createTestTopic(topic, 1, 1);
-
-        // ---------- Write the Canal json into Kafka -------------------
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/tostring/canal-data-1.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/table/tostring/canal-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -986,9 +884,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testCDCOperations(boolean ignoreDelete) throws Exception {
         final String topic = "event-insert" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
-
-        // ---------- Write the Canal json into Kafka -------------------
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/event/event-row.txt"));
+        writeRecordsToKafka(topic, "kafka/canal/table/event/event-row.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1022,7 +918,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 
110, 101, 46, 98, 105, 110], 9.9]");
         waitForResult(expectedRow, table, rowType, primaryKeys);
 
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/event/event-insert.txt"));
+        writeRecordsToKafka(topic, "kafka/canal/table/event/event-insert.txt");
 
         // For the INSERT operation
         List<String> expectedInsert =
@@ -1033,7 +929,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         "+I[2, 4, four, NULL, NULL, NULL, NULL]");
         waitForResult(expectedInsert, table, rowType, primaryKeys);
 
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/event/event-update.txt"));
+        writeRecordsToKafka(topic, "kafka/canal/table/event/event-update.txt");
 
         // For the UPDATE operation
         List<String> expectedUpdate =
@@ -1044,7 +940,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         "+I[2, 4, four, NULL, NULL, NULL, NULL]");
         waitForResult(expectedUpdate, table, rowType, primaryKeys);
 
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/event/event-delete.txt"));
+        writeRecordsToKafka(topic, "kafka/canal/table/event/event-delete.txt");
 
         // For the DELETE operation
         List<String> expectedDelete =
@@ -1082,8 +978,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         .build();
         runActionWithDefaultEnv(action);
 
-        List<String> lines = 
readLines("kafka/canal/table/initialemptytopic/canal-data-1.txt");
-        writeRecordsToKafka(topic, lines);
+        writeRecordsToKafka(topic, 
"kafka/canal/table/initialemptytopic/canal-data-1.txt");
 
         RowType rowType =
                 RowType.of(
@@ -1106,8 +1001,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testSynchronizeIncompleteJson() throws Exception {
         String topic = "incomplete";
         createTestTopic(topic, 1, 1);
-        List<String> lines = 
readLines("kafka/canal/table/incomplete/canal-data-1.txt");
-        writeRecordsToKafka(topic, lines);
+        writeRecordsToKafka(topic, 
"kafka/canal/table/incomplete/canal-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1138,8 +1032,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testSynchronizeNonPkTable() throws Exception {
         String topic = "non_pk";
         createTestTopic(topic, 1, 1);
-        List<String> lines = 
readLines("kafka/canal/table/nonpk/canal-data-1.txt");
-        writeRecordsToKafka(topic, lines);
+        writeRecordsToKafka(topic, "kafka/canal/table/nonpk/canal-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1167,7 +1060,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
     public void testMissingDecimalPrecision() throws Exception {
         String topic = "missing-decimal-precision";
         createTestTopic(topic, 1, 1);
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/incomplete/canal-data-2.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/table/incomplete/canal-data-2.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -1209,8 +1102,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                     Collections.singletonList("_id"),
                     Collections.emptyMap());
         } else {
-            List<String> lines = 
readLines("kafka/canal/table/computedcolumn/canal-data-2.txt");
-            writeRecordsToKafka(topic, lines);
+            writeRecordsToKafka(topic, 
"kafka/canal/table/computedcolumn/canal-data-2.txt");
         }
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -1227,8 +1119,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
         runActionWithDefaultEnv(action);
 
         if (triggerSchemaRetrievalException) {
-            List<String> lines = 
readLines("kafka/canal/table/computedcolumn/canal-data-2.txt");
-            writeRecordsToKafka(topic, lines);
+            writeRecordsToKafka(topic, 
"kafka/canal/table/computedcolumn/canal-data-2.txt");
         }
 
         RowType rowType =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index ba9633162..04dfb3769 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -22,16 +22,13 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
@@ -125,22 +122,10 @@ public class KafkaDebeziumSyncTableActionITCase extends 
KafkaSyncTableActionITCa
         final String topic = "test_null_value";
         createTestTopic(topic, 1, 1);
 
-        List<String> lines = 
readLines("kafka/debezium/table/nullvalue/debezium-data-1.txt");
-        writeRecordsToKafka(topic, lines);
-
+        writeRecordsToKafka(topic, 
"kafka/debezium/table/nullvalue/debezium-data-1.txt");
         // write null value
-        Properties producerProperties = getStandardProps();
-        producerProperties.setProperty("retries", "0");
-        producerProperties.put(
-                "key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        producerProperties.put(
-                "value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(producerProperties);
         kafkaProducer.send(new ProducerRecord<>(topic, null));
-        kafkaProducer.close();
-
-        lines = 
readLines("kafka/debezium/table/nullvalue/debezium-data-2.txt");
-        writeRecordsToKafka(topic, lines);
+        writeRecordsToKafka(topic, 
"kafka/debezium/table/nullvalue/debezium-data-2.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
index 9d544c632..cb33f6259 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
@@ -48,13 +48,8 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase 
{
     public void testKafkaSchema() throws Exception {
         final String topic = "test_kafka_schema";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = 
readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write canal data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/canal/table/schemaevolution/canal-data-1.txt");
+
         Configuration kafkaConfig = 
Configuration.fromMap(getBasicKafkaConfig());
         kafkaConfig.setString(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.setString(TOPIC.key(), topic);
@@ -76,9 +71,7 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase {
     public void testTableOptionsChange() throws Exception {
         final String topic = "test_table_options_change";
         createTestTopic(topic, 1, 1);
-
-        // ---------- Write the Canal json into Kafka -------------------
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/optionschange/canal-data-1.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/table/optionschange/canal-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
@@ -94,7 +87,7 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase {
         waitingTables(tableName);
         jobClient.cancel();
 
-        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/optionschange/canal-data-2.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/table/optionschange/canal-data-2.txt");
 
         tableConfig.put("sink.savepoint.auto-tag", "true");
         tableConfig.put("tag.num-retained-max", "5");
@@ -121,10 +114,8 @@ public class KafkaSchemaITCase extends 
KafkaActionITCaseBase {
     public void testNewlyAddedTablesOptionsChange() throws Exception {
         final String topic = "test_database_options_change";
         createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, 
"kafka/canal/database/schemaevolution/topic0/canal-data-1.txt");
 
-        // ---------- Write the Canal json into Kafka -------------------
-        writeRecordsToKafka(
-                topic, 
readLines("kafka/canal/database/schemaevolution/topic0/canal-data-1.txt"));
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -148,8 +139,7 @@ public class KafkaSchemaITCase extends 
KafkaActionITCaseBase {
         tableConfig.put("snapshot.num-retained.max", "10");
         tableConfig.put("changelog-producer", "input");
 
-        writeRecordsToKafka(
-                topic, 
readLines("kafka/canal/database/schemaevolution/topic1/canal-data-1.txt"));
+        writeRecordsToKafka(topic, 
"kafka/canal/database/schemaevolution/topic1/canal-data-1.txt");
         KafkaSyncDatabaseAction action2 =
                 
syncDatabaseActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
         runActionWithDefaultEnv(action2);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
index cdad175ea..16aff6984 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
@@ -49,19 +49,13 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<String> topics = Arrays.asList(topic1, topic2);
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
 
-        // ---------- Write the data into Kafka -------------------
-
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(i),
-                        readLines(
-                                String.format(
-                                        
"kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
-                                        format, i, format)));
-            } catch (Exception e) {
-                throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-            }
+            writeRecordsToKafka(
+                    topics.get(i),
+                    "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
+                    format,
+                    i,
+                    format);
         }
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -83,19 +77,13 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<String> topics = Collections.singletonList(topic);
         topics.forEach(t -> createTestTopic(t, 1, 1));
 
-        // ---------- Write the maxwell json into Kafka -------------------
-
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(0),
-                        readLines(
-                                String.format(
-                                        
"kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
-                                        format, i, format)));
-            } catch (Exception e) {
-                throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-            }
+            writeRecordsToKafka(
+                    topics.get(0),
+                    "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
+                    format,
+                    i,
+                    format);
         }
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
@@ -150,16 +138,12 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(expected2, table2, rowType2, getPrimaryKey(format));
 
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        writeOne ? topics.get(0) : topics.get(i),
-                        readLines(
-                                String.format(
-                                        
"kafka/%s/database/schemaevolution/topic%s/%s-data-2.txt",
-                                        format, i, format)));
-            } catch (Exception e) {
-                throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-            }
+            writeRecordsToKafka(
+                    writeOne ? topics.get(0) : topics.get(i),
+                    "kafka/%s/database/schemaevolution/topic%s/%s-data-2.txt",
+                    format,
+                    i,
+                    format);
         }
 
         rowType1 =
@@ -244,19 +228,13 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<String> topics = Arrays.asList(topic1, topic2);
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
 
-        // ---------- Write the data into Kafka -------------------
-
         for (int i = 0; i < topics.size(); i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(i),
-                        readLines(
-                                String.format(
-                                        
"kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
-                                        format, i, format)));
-            } catch (Exception e) {
-                throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-            }
+            writeRecordsToKafka(
+                    topics.get(i),
+                    "kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
+                    format,
+                    i,
+                    format);
         }
 
         // try synchronization
@@ -298,19 +276,12 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         int fileCount = 2;
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
 
-        // ---------- Write the maxwell json into Kafka -------------------
-
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(0),
-                        readLines(
-                                String.format(
-                                        
"kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
-                                        format, i, format)));
-            } catch (Exception e) {
-                throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-            }
+            writeRecordsToKafka(
+                    topics.get(0),
+                    String.format(
+                            
"kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt",
+                            format, i, format));
         }
 
         // try synchronization
@@ -370,16 +341,12 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(expected, table2, rowType2, getPrimaryKey(format));
 
         for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        writeOne ? topics.get(0) : topics.get(i),
-                        readLines(
-                                String.format(
-                                        
"kafka/%s/database/prefixsuffix/topic%s/%s-data-2.txt",
-                                        format, i, format)));
-            } catch (Exception e) {
-                throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-            }
+            writeRecordsToKafka(
+                    writeOne ? topics.get(0) : topics.get(i),
+                    "kafka/%s/database/prefixsuffix/topic%s/%s-data-2.txt",
+                    format,
+                    i,
+                    format);
         }
         rowType1 =
                 RowType.of(
@@ -470,18 +437,9 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<String> topics = Collections.singletonList(topic1);
         topics.forEach(topic -> createTestTopic(topic, 1, 1));
 
-        // ---------- Write the data into Kafka -------------------
+        writeRecordsToKafka(
+                topics.get(0), 
"kafka/%s/database/include/topic0/%s-data-1.txt", format, format);
 
-        try {
-            writeRecordsToKafka(
-                    topics.get(0),
-                    readLines(
-                            String.format(
-                                    
"kafka/%s/database/include/topic0/%s-data-1.txt",
-                                    format, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
         // try synchronization
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -503,14 +461,8 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         final String topic = "case-insensitive";
         createTestTopic(topic, 1, 1);
 
-        // ---------- Write the data into Kafka -------------------
-
         writeRecordsToKafka(
-                topic,
-                readLines(
-                        String.format(
-                                
"kafka/%s/database/case-insensitive/%s-data-1.txt",
-                                format, format)));
+                topic, "kafka/%s/database/case-insensitive/%s-data-1.txt", 
format, format);
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index e13ab86e9..5785f680a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -58,16 +58,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     protected void runSingleTableSchemaEvolution(String sourceDir, String 
format) throws Exception {
         final String topic = "schema_evolution";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        String.format(
-                                "kafka/%s/table/%s/%s-data-1.txt", format, 
sourceDir, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-1.txt", format, 
sourceDir, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -101,15 +93,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         "+I[102, car battery, 12V car battery, 8.1]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    "kafka/%s/table/%s/%s-data-2.txt", format, 
sourceDir, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-2.txt", format, 
sourceDir, format);
+
         rowType =
                 RowType.of(
                         new DataType[] {
@@ -128,15 +113,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    "kafka/%s/table/%s/%s-data-3.txt", format, 
sourceDir, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-3.txt", format, 
sourceDir, format);
+
         rowType =
                 RowType.of(
                         new DataType[] {
@@ -161,16 +139,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testNotSupportFormat(String format) throws Exception {
         final String topic = "not_support";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        String.format(
-                                
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "togg-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -190,16 +160,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     protected void testAssertSchemaCompatible(String format) throws Exception {
         final String topic = "assert_schema_compatible";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        String.format(
-                                
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/%s/table/schemaevolution/%s-data-1.txt", format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -231,15 +193,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     protected void testStarUpOptionSpecific(String format) throws Exception {
         final String topic = "start_up_specific";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", 
format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -273,15 +228,9 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     protected void testStarUpOptionLatest(String format) throws Exception {
         final String topic = "start_up_latest";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines, true);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(
+                topic, true, "kafka/%s/table/startupmode/%s-data-1.txt", 
format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -295,15 +244,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
 
         Thread.sleep(5000);
         FileStoreTable table = getFileStoreTable(tableName);
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+
+        writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", 
format, format);
 
         RowType rowType =
                 RowType.of(
@@ -326,15 +268,9 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testStarUpOptionTimestamp(String format) throws Exception {
         final String topic = "start_up_timestamp";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(
+                topic, true, "kafka/%s/table/startupmode/%s-data-1.txt", 
format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -348,15 +284,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", 
format, format);
+
         FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
@@ -380,15 +309,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testStarUpOptionEarliest(String format) throws Exception {
         final String topic = "start_up_earliest";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", 
format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -400,15 +322,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", 
format, format);
+
         FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
@@ -434,15 +349,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testStarUpOptionGroup(String format) throws Exception {
         final String topic = "start_up_group";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", 
format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -454,15 +362,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    
"kafka/%s/table/startupmode/%s-data-2.txt", format, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", 
format, format);
+
         FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
@@ -488,16 +389,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testComputedColumn(String format) throws Exception {
         String topic = "computed_column";
         createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, 
"kafka/%s/table/computedcolumn/%s-data-1.txt", format, format);
 
-        List<String> lines =
-                readLines(
-                        String.format(
-                                "kafka/%s/table/computedcolumn/%s-data-1.txt", 
format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -528,14 +421,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     protected void testCDCOperations(String format) throws Exception {
         String topic = "event";
         createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, "kafka/%s/table/event/event-insert.txt", 
format);
 
-        List<String> lines =
-                
readLines(String.format("kafka/%s/table/event/event-insert.txt", format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
         kafkaConfig.put(TOPIC.key(), topic);
@@ -566,13 +453,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         "+I[103, scooter, Big 2-wheel scooter , 5.1]");
         waitForResult(expectedInsert, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    
readLines(String.format("kafka/%s/table/event/event-update.txt", format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/event/event-update.txt", 
format);
+
         // For the UPDATE operation
         List<String> expectedUpdate =
                 Arrays.asList(
@@ -581,13 +463,7 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         "+I[103, scooter, Big 2-wheel scooter , 8.1]");
         waitForResult(expectedUpdate, table, rowType, primaryKeys);
 
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    
readLines(String.format("kafka/%s/table/event/event-delete.txt", format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, "kafka/%s/table/event/event-delete.txt", 
format);
 
         // For the REPLACE operation
         List<String> expectedReplace =
@@ -600,17 +476,9 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testKafkaBuildSchemaWithDelete(String format) throws Exception 
{
         final String topic = "test_kafka_schema";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the Debezium json into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        String.format(
-                                
"kafka/%s/table/schema/schemaevolution/%s-data-4.txt",
-                                format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(
+                topic, "kafka/%s/table/schema/schemaevolution/%s-data-4.txt", 
format, format);
+
         Configuration kafkaConfig = 
Configuration.fromMap(getBasicKafkaConfig());
         kafkaConfig.setString(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.setString(TOPIC.key(), topic);
@@ -632,9 +500,7 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testWaterMarkSyncTable(String format) throws Exception {
         String topic = "watermark";
         createTestTopic(topic, 1, 1);
-        writeRecordsToKafka(
-                topic,
-                
readLines(String.format("kafka/%s/table/watermark/%s-data-1.txt", format, 
format)));
+        writeRecordsToKafka(topic, "kafka/%s/table/watermark/%s-data-1.txt", 
format, format);
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -666,13 +532,7 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testSchemaIncludeRecord(String format) throws Exception {
         String topic = "schema_include";
         createTestTopic(topic, 1, 1);
-
-        List<String> lines = 
readLines("kafka/debezium/table/schema/include/debezium-data-1.txt");
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception("Failed to write debezium data to Kafka.", e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/debezium/table/schema/include/debezium-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -706,9 +566,7 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     public void testAllTypesWithSchemaImpl(String format) throws Exception {
         String topic = "schema_include_all_type";
         createTestTopic(topic, 1, 1);
-
-        List<String> lines = 
readLines("kafka/debezium/table/schema/alltype/debezium-data-1.txt");
-        writeRecordsToKafka(topic, lines);
+        writeRecordsToKafka(topic, 
"kafka/debezium/table/schema/alltype/debezium-data-1.txt");
 
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
@@ -952,16 +810,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
     protected void testTableFiledValNull(String format) throws Exception {
         final String topic = "table_filed_val_null";
         createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        String.format(
-                                
"kafka/%s/table/schemaevolution/%s-data-4.txt", format, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
-        }
+        writeRecordsToKafka(topic, 
"kafka/%s/table/schemaevolution/%s-data-4.txt", format, format);
+
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);

Reply via email to