This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ae88e59b3 [flink] kafka Ingestion support maxwell format (#1963)
ae88e59b3 is described below
commit ae88e59b3cf2c7e430c37be903c903e56581c178
Author: monster <[email protected]>
AuthorDate: Wed Sep 20 21:15:43 2023 +0800
[flink] kafka Ingestion support maxwell format (#1963)
---
docs/content/how-to/cdc-ingestion.md | 4 +-
.../flink/action/cdc/kafka/formats/DataFormat.java | 4 +-
.../kafka/formats/maxwell/MaxwellRecordParser.java | 106 +++++
.../KafkaMaxwellSyncDatabaseActionITCase.java | 475 ++++++++++++++++++++
.../kafka/KafkaMaxwellSyncTableActionITCase.java | 495 +++++++++++++++++++++
.../database/include/topic0/maxwell-data-1.txt | 22 +
.../prefixsuffix/topic0/maxwell-data-1.txt | 20 +
.../prefixsuffix/topic0/maxwell-data-2.txt | 20 +
.../prefixsuffix/topic1/maxwell-data-1.txt | 20 +
.../prefixsuffix/topic1/maxwell-data-2.txt | 20 +
.../schemaevolution/topic0/maxwell-data-1.txt | 20 +
.../schemaevolution/topic0/maxwell-data-2.txt | 20 +
.../schemaevolution/topic1/maxwell-data-1.txt | 20 +
.../schemaevolution/topic1/maxwell-data-2.txt | 20 +
.../table/computedcolumn/maxwell-data-1.txt | 19 +
.../table/schemaevolution/maxwell-data-1.txt | 20 +
.../table/schemaevolution/maxwell-data-2.txt | 20 +
.../table/schemaevolution/maxwell-data-3.txt | 22 +
.../maxwell/table/startupmode/maxwell-data-1.txt | 20 +
.../maxwell/table/startupmode/maxwell-data-2.txt | 20 +
20 files changed, 1384 insertions(+), 3 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index d2d9a4b30..df1320855 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -314,7 +314,7 @@ If a message in a Kafka topic is a change event captured
from another database u
</tr>
<tr>
<td><a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/maxwell/
>}}">Maxwell CDC</a></td>
- <td>False</td>
+ <td>True</td>
</tr>
<tr>
<td><a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/ogg/">OGG
CDC</a></td>
@@ -324,7 +324,7 @@ If a message in a Kafka topic is a change event captured
from another database u
</table>
{{< hint info >}}
-In Oracle GoldenGate, the data format synchronized to Kafka does not include
field data type information. As a result, Paimon sets the data type for all
fields to "String" by default.
+In Oracle GoldenGate and Maxwell, the data format synchronized to Kafka does
not include field data type information. As a result, Paimon sets the data type
for all fields to "String" by default.
{{< /hint >}}
### Synchronizing Tables
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
index 155532900..fda6311f2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
@@ -22,6 +22,7 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
+import
org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;
import org.apache.flink.configuration.Configuration;
@@ -38,7 +39,8 @@ import java.util.List;
*/
public enum DataFormat {
CANAL_JSON(CanalRecordParser::new),
- OGG_JSON(OggRecordParser::new);
+ OGG_JSON(OggRecordParser::new),
+ MAXWELL_JSON(MaxwellRecordParser::new);
// Add more data formats here if needed
private final RecordParserFactory parser;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
new file mode 100644
index 000000000..0a2df9fcd
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * The {@code MaxwellRecordParser} class extends the abstract {@link
RecordParser} and is designed
+ * to parse records from Maxwell's JSON change data capture (CDC) format.
Maxwell is a CDC solution
+ * for MySQL databases that captures row-level changes to database tables and
outputs them in JSON
+ * format. This parser extracts relevant information from the Maxwell-JSON
format and converts it
+ * into a list of {@link RichCdcMultiplexRecord} objects.
+ *
+ * <p>The class supports various database operations such as INSERT, UPDATE,
and DELETE, and creates
+ * corresponding {@link RichCdcMultiplexRecord} objects to represent these
changes.
+ *
+ * <p>Validation is performed to ensure that the JSON records contain all
necessary fields, and the
+ * class also supports schema extraction for the Kafka topic.
+ */
+public class MaxwellRecordParser extends RecordParser {
+
+ private static final String FIELD_OLD = "old";
+ private static final String FIELD_TYPE = "type";
+ private static final String OP_INSERT = "insert";
+ private static final String OP_UPDATE = "update";
+ private static final String OP_DELETE = "delete";
+
+ public MaxwellRecordParser(
+ boolean caseSensitive,
+ TypeMapping typeMapping,
+ TableNameConverter tableNameConverter,
+ List<ComputedColumn> computedColumns) {
+ super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+ }
+
+ @Override
+ public List<RichCdcMultiplexRecord> extractRecords() {
+ String operation = extractStringFromRootJson(FIELD_TYPE);
+ JsonNode data = root.get(fieldData);
+ List<RichCdcMultiplexRecord> records = new ArrayList<>();
+ switch (operation) {
+ case OP_INSERT:
+ processRecord(data, RowKind.INSERT, records);
+ break;
+ case OP_UPDATE:
+ processRecord(mergeOldRecord(data, root.get(FIELD_OLD)),
RowKind.DELETE, records);
+ processRecord(data, RowKind.INSERT, records);
+ break;
+ case OP_DELETE:
+ processRecord(data, RowKind.DELETE, records);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown record
operation: " + operation);
+ }
+ return records;
+ }
+
+ @Override
+ protected void validateFormat() {
+ String errorMessageTemplate =
+ "Didn't find '%s' node in json. Please make sure your topic's
format is correct.";
+ checkNotNull(root.get(FIELD_TABLE), errorMessageTemplate, FIELD_TABLE);
+ checkNotNull(root.get(FIELD_DATABASE), errorMessageTemplate,
FIELD_DATABASE);
+ checkNotNull(root.get(FIELD_TYPE), errorMessageTemplate, FIELD_TYPE);
+ checkNotNull(root.get(fieldData), errorMessageTemplate, fieldData);
+ checkNotNull(root.get(fieldPrimaryKeys), errorMessageTemplate,
fieldPrimaryKeys);
+ }
+
+ @Override
+ protected void setPrimaryField() {
+ fieldPrimaryKeys = "primary_key_columns";
+ }
+
+ @Override
+ protected void setDataField() {
+ fieldData = "data";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
new file mode 100644
index 000000000..5a38bd542
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link KafkaSyncDatabaseAction}. */
+public class KafkaMaxwellSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionMultiTopic() throws Exception {
+ final String topic1 = "schema_evolution_0";
+ final String topic2 = "schema_evolution_1";
+ boolean writeOne = false;
+ int fileCount = 2;
+ List<String> topics = Arrays.asList(topic1, topic2);
+ topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+ // ---------- Write the maxwell json into Kafka -------------------
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ writeRecordsToKafka(
+ topics.get(i),
+ readLines(
+ "kafka/maxwell/database/schemaevolution/topic"
+ + i
+ + "/maxwell-data-1.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.",
e);
+ }
+ }
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", String.join(";", topics));
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ testSchemaEvolutionImpl(topics, writeOne, fileCount);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionOneTopic() throws Exception {
+ final String topic = "schema_evolution";
+ boolean writeOne = true;
+ int fileCount = 2;
+ List<String> topics = Collections.singletonList(topic);
+ topics.forEach(t -> createTestTopic(t, 1, 1));
+
+ // ---------- Write the maxwell json into Kafka -------------------
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ writeRecordsToKafka(
+ topics.get(0),
+ readLines(
+ "kafka/maxwell/database/schemaevolution/topic"
+ + i
+ + "/maxwell-data-1.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.",
e);
+ }
+ }
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", String.join(";", topics));
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ testSchemaEvolutionImpl(topics, writeOne, fileCount);
+ }
+
+ private void testSchemaEvolutionImpl(List<String> topics, boolean
writeOne, int fileCount)
+ throws Exception {
+ waitingTables("t1", "t2");
+
+ FileStoreTable table1 = getFileStoreTable("t1");
+ FileStoreTable table2 = getFileStoreTable("t2");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys1 = Collections.singletonList("id");
+ List<String> expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+ "+I[102, car battery, 12V car battery, 8.1]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys2 = Collections.singletonList("id");
+ List<String> expected2 =
+ Arrays.asList(
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+ waitForResult(expected2, table2, rowType2, primaryKeys2);
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+ readLines(
+ "kafka/maxwell/database/schemaevolution/topic"
+ + i
+ + "/maxwell-data-2.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.",
e);
+ }
+ }
+
+ rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight",
"age"});
+ expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
+ "+I[102, car battery, 12V car battery, 8.1, NULL]",
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8, 19]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight",
"address"});
+ expected =
+ Arrays.asList(
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8, Beijing]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75,
Shanghai]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+ }
+
+ @Test
+ public void testTopicIsEmpty() {
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+
+ KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig).build();
+
+ assertThatThrownBy(action::run)
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ IllegalArgumentException.class,
+ "kafka-conf [topic] must be specified."));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTableAffixMultiTopic() throws Exception {
+ // create table t1
+ createFileStoreTable(
+ "test_prefix_t1_test_suffix",
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"}),
+ Collections.emptyList(),
+ Collections.singletonList("id"),
+ Collections.emptyMap());
+
+ final String topic1 = "prefix_suffix_0";
+ final String topic2 = "prefix_suffix_1";
+ boolean writeOne = false;
+ int fileCount = 2;
+ List<String> topics = Arrays.asList(topic1, topic2);
+ topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+ // ---------- Write the maxwell json into Kafka -------------------
+
+ for (int i = 0; i < topics.size(); i++) {
+ try {
+ writeRecordsToKafka(
+ topics.get(i),
+ readLines(
+ "kafka/maxwell/database/prefixsuffix/topic"
+ + i
+ + "/maxwell-data-1.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.",
e);
+ }
+ }
+
+ // try synchronization
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", String.join(";", topics));
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .withTablePrefix("test_prefix_")
+ .withTableSuffix("_test_suffix")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ testTableAffixImpl(topics, writeOne, fileCount);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTableAffixOneTopic() throws Exception {
+ // create table t1
+ createFileStoreTable(
+ "test_prefix_t1_test_suffix",
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"}),
+ Collections.emptyList(),
+ Collections.singletonList("id"),
+ Collections.emptyMap());
+
+ final String topic1 = "prefix_suffix";
+ List<String> topics = Collections.singletonList(topic1);
+ boolean writeOne = true;
+ int fileCount = 2;
+ topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+ // ---------- Write the maxwell json into Kafka -------------------
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ writeRecordsToKafka(
+ topics.get(0),
+ readLines(
+ "kafka/maxwell/database/prefixsuffix/topic"
+ + i
+ + "/maxwell-data-1.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.",
e);
+ }
+ }
+
+ // try synchronization
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", String.join(";", topics));
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .withTablePrefix("test_prefix_")
+ .withTableSuffix("_test_suffix")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ testTableAffixImpl(topics, writeOne, fileCount);
+ }
+
+ private void testTableAffixImpl(List<String> topics, boolean writeOne, int
fileCount)
+ throws Exception {
+ waitingTables("test_prefix_t1_test_suffix",
"test_prefix_t2_test_suffix");
+
+ FileStoreTable table1 =
getFileStoreTable("test_prefix_t1_test_suffix");
+ FileStoreTable table2 =
getFileStoreTable("test_prefix_t2_test_suffix");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys1 = Collections.singletonList("id");
+ List<String> expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+ "+I[102, car battery, 12V car battery, 8.1]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys2 = Collections.singletonList("id");
+ expected =
+ Arrays.asList(
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ writeRecordsToKafka(
+ writeOne ? topics.get(0) : topics.get(i),
+ readLines(
+ "kafka/maxwell/database/prefixsuffix/topic"
+ + i
+ + "/maxwell-data-2.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.",
e);
+ }
+ }
+ rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight",
"address"});
+ expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14,
Beijing]",
+ "+I[102, car battery, 12V car battery, 8.1,
Shanghai]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight",
"age"});
+ expected =
+ Arrays.asList(
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8, 19]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testIncludingTables() throws Exception {
+ includingAndExcludingTablesImpl(
+ "flink|paimon.+",
+ null,
+ Arrays.asList("flink", "paimon_1", "paimon_2"),
+ Collections.singletonList("ignore"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testExcludingTables() throws Exception {
+ includingAndExcludingTablesImpl(
+ null,
+ "flink|paimon.+",
+ Collections.singletonList("ignore"),
+ Arrays.asList("flink", "paimon_1", "paimon_2"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testIncludingAndExcludingTables() throws Exception {
+ includingAndExcludingTablesImpl(
+ "flink|paimon.+",
+ "paimon_1",
+ Arrays.asList("flink", "paimon_2"),
+ Arrays.asList("paimon_1", "ignore"));
+ }
+
+ private void includingAndExcludingTablesImpl(
+ @Nullable String includingTables,
+ @Nullable String excludingTables,
+ List<String> existedTables,
+ List<String> notExistedTables)
+ throws Exception {
+ final String topic1 = "include_exclude" + UUID.randomUUID();
+ List<String> topics = Collections.singletonList(topic1);
+ topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+ // ---------- Write the maxwell json into Kafka -------------------
+
+ try {
+ writeRecordsToKafka(
+ topics.get(0),
+
readLines("kafka/maxwell/database/include/topic0/maxwell-data-1.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ // try synchronization
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", String.join(";", topics));
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .includingTables(includingTables)
+ .excludingTables(excludingTables)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ // check paimon tables
+ waitingTables(existedTables);
+ assertTableNotExists(notExistedTables);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
new file mode 100644
index 000000000..8bfae9493
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link KafkaMaxwellSyncTableActionITCase}. */
+public class KafkaMaxwellSyncTableActionITCase extends KafkaActionITCaseBase {
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolution() throws Exception {
+ runSingleTableSchemaEvolution("schemaevolution");
+ }
+
+ private void runSingleTableSchemaEvolution(String sourceDir) throws
Exception {
+ final String topic = "schema_evolution";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
+
readLines(String.format("kafka/maxwell/table/%s/maxwell-data-1.txt",
sourceDir));
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ testSchemaEvolutionImpl(topic, sourceDir);
+ }
+
+ private void testSchemaEvolutionImpl(String topic, String sourceDir)
throws Exception {
+ FileStoreTable table = getFileStoreTable(tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ List<String> expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+ "+I[102, car battery, 12V car battery, 8.1]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ try {
+ writeRecordsToKafka(
+ topic,
+ readLines(
+
String.format("kafka/maxwell/table/%s/maxwell-data-2.txt", sourceDir)));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight",
"age"});
+ expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
+ "+I[102, car battery, 12V car battery, 8.1, NULL]",
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8, 18]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ try {
+ writeRecordsToKafka(
+ topic,
+ readLines(
+
String.format("kafka/maxwell/table/%s/maxwell-data-3.txt", sourceDir)));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight",
"age", "address"});
+ expected =
+ Arrays.asList(
+ "+I[102, car battery, 12V car battery, 8.1, NULL,
NULL]",
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8, 18, NULL]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24,
NULL]",
+ "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL,
Beijing]",
+ "+I[107, rocks, box of assorted rocks, 5.3, NULL,
NULL]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testNotSupportFormat() throws Exception {
+ final String topic = "not_support";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
readLines("kafka/maxwell/table/schemaevolution/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "togg-json");
+ kafkaConfig.put("topic", topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+
+ assertThatThrownBy(action::run)
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "This format: togg-json is not supported."));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testAssertSchemaCompatible() throws Exception {
+ final String topic = "assert_schema_compatible";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
readLines("kafka/maxwell/table/schemaevolution/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+
+ // create an incompatible table
+ createFileStoreTable(
+ RowType.of(
+ new DataType[] {DataTypes.STRING(),
DataTypes.STRING()},
+ new String[] {"k", "v1"}),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyMap());
+
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+
+ assertThatThrownBy(action::run)
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Paimon schema and source table schema are not
compatible.\n"
+ + "Paimon fields are: [`k` STRING NOT
NULL, `v1` STRING].\n"
+ + "Source table fields are: [`id`
STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionSpecific() throws Exception {
+ final String topic = "start_up_specific";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+ kafkaConfig.put("scan.startup.mode", "specific-offsets");
+ kafkaConfig.put("scan.startup.specific-offsets",
"partition:0,offset:1");
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable(tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ // topic has two records we read two
+ List<String> expected =
+ Collections.singletonList("+I[102, car battery, 12V car
battery, 8.1]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionLatest() throws Exception {
+ final String topic = "start_up_latest";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+ kafkaConfig.put("scan.startup.mode", "latest-offset");
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ Thread.sleep(5000);
+ FileStoreTable table = getFileStoreTable(tableName);
+ try {
+ writeRecordsToKafka(
+ topic,
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ // topic has four records we read two
+ List<String> expected =
+ Arrays.asList(
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionTimestamp() throws Exception {
+ final String topic = "start_up_timestamp";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+ kafkaConfig.put("scan.startup.mode", "timestamp");
+ kafkaConfig.put(
+ "scan.startup.timestamp-millis",
String.valueOf(System.currentTimeMillis()));
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try {
+ writeRecordsToKafka(
+ topic,
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ FileStoreTable table = getFileStoreTable(tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ // topic has four records we read two
+ List<String> expected =
+ Arrays.asList(
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionEarliest() throws Exception {
+ final String topic = "start_up_earliest";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+ kafkaConfig.put("scan.startup.mode", "earliest-offset");
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try {
+ writeRecordsToKafka(
+ topic,
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ FileStoreTable table = getFileStoreTable(tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ // topic has four records we read all
+ List<String> expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+ "+I[102, car battery, 12V car battery, 8.1]",
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionGroup() throws Exception {
+ final String topic = "start_up_group";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the maxwell json into Kafka -------------------
+ List<String> lines =
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+ kafkaConfig.put("scan.startup.mode", "group-offsets");
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try {
+ writeRecordsToKafka(
+ topic,
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write maxwell data to Kafka.", e);
+ }
+ FileStoreTable table = getFileStoreTable(tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ // topic has four records we read all
+ List<String> expected =
+ Arrays.asList(
+ "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+ "+I[102, car battery, 12V car battery, 8.1]",
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8]",
+ "+I[104, hammer, 12oz carpenter's hammer,
0.75]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testComputedColumn() throws Exception {
+ String topic = "computed_column";
+ createTestTopic(topic, 1, 1);
+
+ List<String> lines =
readLines("kafka/maxwell/table/computedcolumn/maxwell-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write canal data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put("topic", topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPartitionKeys("_year")
+ .withPrimaryKeys("_id", "_year")
+ .withComputedColumnArgs("_year=year(_date)")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.INT().notNull()
+ },
+ new String[] {"_id", "_date", "_year"});
+ waitForResult(
+ Collections.singletonList("+I[101, 2023-03-23, 2023]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Arrays.asList("_id", "_year"));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/include/topic0/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/include/topic0/maxwell-data-1.txt
new file mode 100644
index 000000000..b1c5d21e8
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/include/topic0/maxwell-data-1.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database_affix","table":"paimon_1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database_affix","table":"paimon_2","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":8.1},"primary_key_columns":
["id"]}
+{"database":"paimon_sync_database_affix","table":"ignore","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database_affix","table":"flink","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-1.txt
new file mode 100644
index 000000000..31d51f1e7
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":8.1},"primary_key_columns":
["id"]}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-2.txt
new file mode 100644
index 000000000..3c3a4a791
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14,"address":"Beijing"},"primary_key_columns":
["id"]}
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car
battery","weight":8.1,"address":"Shanghai"},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-1.txt
new file mode 100644
index 000000000..e9b80f4ab
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-2.txt
new file mode 100644
index 000000000..82e3b4e1f
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8,"age":19},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75,"age":25},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-1.txt
new file mode 100644
index 000000000..81aab546a
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":8.1},"primary_key_columns":
["id"]}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-2.txt
new file mode 100644
index 000000000..85a7ea623
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8,"age":19},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75,"age":25},"primary_key_columns": ["id"]}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-1.txt
new file mode 100644
index 000000000..a2a8efcae
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-2.txt
new file mode 100644
index 000000000..a1b96fff3
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8,"address":"Beijing"},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75,"address":"Shanghai"},"primary_key_columns":
["id"]}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/computedcolumn/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/computedcolumn/maxwell-data-1.txt
new file mode 100644
index 000000000..d31d26ffb
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/computedcolumn/maxwell-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"_id":101,"_date":"2023-03-23"},"primary_key_columns":
["_id"]}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-1.txt
new file mode 100644
index 000000000..58a04a82a
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":8.1},"primary_key_columns":
["id"]}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-2.txt
new file mode 100644
index 000000000..056ce73bc
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8,"age":18},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75,"age":24},"primary_key_columns": ["id"]}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-3.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-3.txt
new file mode 100644
index 000000000..c54ecdad2
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-3.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz
carpenter's
hammer","weight":0.875,"address":"Shanghai"},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684906,"xid":7216,"commit":true,"data":{"id":105,"name":"hammer","description":"14oz
carpenter's
hammer","weight":0.875,"address":"Beijing"},"old":{"address":"Shanghai"},"primary_key_columns":
["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box
of assorted rocks","weight":5.3},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-1.txt
new file mode 100644
index 000000000..58a04a82a
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
battery","description":"12V car battery","weight":8.1},"primary_key_columns":
["id"]}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-2.txt
new file mode 100644
index 000000000..169508abb
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}