This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ae88e59b3 [flink] kafka Ingestion support maxwell format (#1963)
ae88e59b3 is described below

commit ae88e59b3cf2c7e430c37be903c903e56581c178
Author: monster <[email protected]>
AuthorDate: Wed Sep 20 21:15:43 2023 +0800

    [flink] kafka Ingestion support maxwell format (#1963)
---
 docs/content/how-to/cdc-ingestion.md               |   4 +-
 .../flink/action/cdc/kafka/formats/DataFormat.java |   4 +-
 .../kafka/formats/maxwell/MaxwellRecordParser.java | 106 +++++
 .../KafkaMaxwellSyncDatabaseActionITCase.java      | 475 ++++++++++++++++++++
 .../kafka/KafkaMaxwellSyncTableActionITCase.java   | 495 +++++++++++++++++++++
 .../database/include/topic0/maxwell-data-1.txt     |  22 +
 .../prefixsuffix/topic0/maxwell-data-1.txt         |  20 +
 .../prefixsuffix/topic0/maxwell-data-2.txt         |  20 +
 .../prefixsuffix/topic1/maxwell-data-1.txt         |  20 +
 .../prefixsuffix/topic1/maxwell-data-2.txt         |  20 +
 .../schemaevolution/topic0/maxwell-data-1.txt      |  20 +
 .../schemaevolution/topic0/maxwell-data-2.txt      |  20 +
 .../schemaevolution/topic1/maxwell-data-1.txt      |  20 +
 .../schemaevolution/topic1/maxwell-data-2.txt      |  20 +
 .../table/computedcolumn/maxwell-data-1.txt        |  19 +
 .../table/schemaevolution/maxwell-data-1.txt       |  20 +
 .../table/schemaevolution/maxwell-data-2.txt       |  20 +
 .../table/schemaevolution/maxwell-data-3.txt       |  22 +
 .../maxwell/table/startupmode/maxwell-data-1.txt   |  20 +
 .../maxwell/table/startupmode/maxwell-data-2.txt   |  20 +
 20 files changed, 1384 insertions(+), 3 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index d2d9a4b30..df1320855 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -314,7 +314,7 @@ If a message in a Kafka topic is a change event captured 
from another database u
         </tr>
         <tr>
          <td><a 
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/maxwell/
 >}}">Maxwell CDC</a></td>
-        <td>False</td>
+        <td>True</td>
         </tr>
         <tr>
          <td><a 
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/ogg/";>OGG
 CDC</a></td>
@@ -324,7 +324,7 @@ If a message in a Kafka topic is a change event captured 
from another database u
 </table>
 
 {{< hint info >}}
-In Oracle GoldenGate, the data format synchronized to Kafka does not include 
field data type information. As a result, Paimon sets the data type for all 
fields to "String" by default.
+In Oracle GoldenGate and Maxwell, the data format synchronized to Kafka does 
not include field data type information. As a result, Paimon sets the data type 
for all fields to "String" by default.
 {{< /hint >}}
 
 ### Synchronizing Tables
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
index 155532900..fda6311f2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
@@ -22,6 +22,7 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import 
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
+import 
org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
 import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;
 
 import org.apache.flink.configuration.Configuration;
@@ -38,7 +39,8 @@ import java.util.List;
  */
 public enum DataFormat {
     CANAL_JSON(CanalRecordParser::new),
-    OGG_JSON(OggRecordParser::new);
+    OGG_JSON(OggRecordParser::new),
+    MAXWELL_JSON(MaxwellRecordParser::new);
     // Add more data formats here if needed
 
     private final RecordParserFactory parser;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
new file mode 100644
index 000000000..0a2df9fcd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * The {@code MaxwellRecordParser} class extends the abstract {@link 
RecordParser} and is designed
+ * to parse records from Maxwell's JSON change data capture (CDC) format. 
Maxwell is a CDC solution
+ * for MySQL databases that captures row-level changes to database tables and 
outputs them in JSON
+ * format. This parser extracts relevant information from the Maxwell-JSON 
format and converts it
+ * into a list of {@link RichCdcMultiplexRecord} objects.
+ *
+ * <p>The class supports various database operations such as INSERT, UPDATE, 
and DELETE, and creates
+ * corresponding {@link RichCdcMultiplexRecord} objects to represent these 
changes.
+ *
+ * <p>Validation is performed to ensure that the JSON records contain all 
necessary fields, and the
+ * class also supports schema extraction for the Kafka topic.
+ */
+public class MaxwellRecordParser extends RecordParser {
+
+    private static final String FIELD_OLD = "old";
+    private static final String FIELD_TYPE = "type";
+    private static final String OP_INSERT = "insert";
+    private static final String OP_UPDATE = "update";
+    private static final String OP_DELETE = "delete";
+
+    public MaxwellRecordParser(
+            boolean caseSensitive,
+            TypeMapping typeMapping,
+            TableNameConverter tableNameConverter,
+            List<ComputedColumn> computedColumns) {
+        super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+    }
+
+    @Override
+    public List<RichCdcMultiplexRecord> extractRecords() {
+        String operation = extractStringFromRootJson(FIELD_TYPE);
+        JsonNode data = root.get(fieldData);
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        switch (operation) {
+            case OP_INSERT:
+                processRecord(data, RowKind.INSERT, records);
+                break;
+            case OP_UPDATE:
+                processRecord(mergeOldRecord(data, root.get(FIELD_OLD)), 
RowKind.DELETE, records);
+                processRecord(data, RowKind.INSERT, records);
+                break;
+            case OP_DELETE:
+                processRecord(data, RowKind.DELETE, records);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown record 
operation: " + operation);
+        }
+        return records;
+    }
+
+    @Override
+    protected void validateFormat() {
+        String errorMessageTemplate =
+                "Didn't find '%s' node in json. Please make sure your topic's 
format is correct.";
+        checkNotNull(root.get(FIELD_TABLE), errorMessageTemplate, FIELD_TABLE);
+        checkNotNull(root.get(FIELD_DATABASE), errorMessageTemplate, 
FIELD_DATABASE);
+        checkNotNull(root.get(FIELD_TYPE), errorMessageTemplate, FIELD_TYPE);
+        checkNotNull(root.get(fieldData), errorMessageTemplate, fieldData);
+        checkNotNull(root.get(fieldPrimaryKeys), errorMessageTemplate, 
fieldPrimaryKeys);
+    }
+
+    @Override
+    protected void setPrimaryField() {
+        fieldPrimaryKeys = "primary_key_columns";
+    }
+
+    @Override
+    protected void setDataField() {
+        fieldData = "data";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
new file mode 100644
index 000000000..5a38bd542
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link KafkaSyncDatabaseAction}. */
+public class KafkaMaxwellSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionMultiTopic() throws Exception {
+        final String topic1 = "schema_evolution_0";
+        final String topic2 = "schema_evolution_1";
+        boolean writeOne = false;
+        int fileCount = 2;
+        List<String> topics = Arrays.asList(topic1, topic2);
+        topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+        // ---------- Write the maxwell json into Kafka -------------------
+
+        for (int i = 0; i < fileCount; i++) {
+            try {
+                writeRecordsToKafka(
+                        topics.get(i),
+                        readLines(
+                                "kafka/maxwell/database/schemaevolution/topic"
+                                        + i
+                                        + "/maxwell-data-1.txt"));
+            } catch (Exception e) {
+                throw new Exception("Failed to write maxwell data to Kafka.", 
e);
+            }
+        }
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", String.join(";", topics));
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testSchemaEvolutionImpl(topics, writeOne, fileCount);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionOneTopic() throws Exception {
+        final String topic = "schema_evolution";
+        boolean writeOne = true;
+        int fileCount = 2;
+        List<String> topics = Collections.singletonList(topic);
+        topics.forEach(t -> createTestTopic(t, 1, 1));
+
+        // ---------- Write the maxwell json into Kafka -------------------
+
+        for (int i = 0; i < fileCount; i++) {
+            try {
+                writeRecordsToKafka(
+                        topics.get(0),
+                        readLines(
+                                "kafka/maxwell/database/schemaevolution/topic"
+                                        + i
+                                        + "/maxwell-data-1.txt"));
+            } catch (Exception e) {
+                throw new Exception("Failed to write maxwell data to Kafka.", 
e);
+            }
+        }
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", String.join(";", topics));
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testSchemaEvolutionImpl(topics, writeOne, fileCount);
+    }
+
+    private void testSchemaEvolutionImpl(List<String> topics, boolean 
writeOne, int fileCount)
+            throws Exception {
+        waitingTables("t1", "t2");
+
+        FileStoreTable table1 = getFileStoreTable("t1");
+        FileStoreTable table2 = getFileStoreTable("t2");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys1 = Collections.singletonList("id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                        "+I[102, car battery, 12V car battery, 8.1]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys2 = Collections.singletonList("id");
+        List<String> expected2 =
+                Arrays.asList(
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+        waitForResult(expected2, table2, rowType2, primaryKeys2);
+
+        for (int i = 0; i < fileCount; i++) {
+            try {
+                writeRecordsToKafka(
+                        writeOne ? topics.get(0) : topics.get(i),
+                        readLines(
+                                "kafka/maxwell/database/schemaevolution/topic"
+                                        + i
+                                        + "/maxwell-data-2.txt"));
+            } catch (Exception e) {
+                throw new Exception("Failed to write maxwell data to Kafka.", 
e);
+            }
+        }
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"age"});
+        expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
+                        "+I[102, car battery, 12V car battery, 8.1, NULL]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, 19]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"address"});
+        expected =
+                Arrays.asList(
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, Beijing]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 
Shanghai]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+
+    @Test
+    public void testTopicIsEmpty() {
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+
+        KafkaSyncDatabaseAction action = 
syncDatabaseActionBuilder(kafkaConfig).build();
+
+        assertThatThrownBy(action::run)
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "kafka-conf [topic] must be specified."));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTableAffixMultiTopic() throws Exception {
+        // create table t1
+        createFileStoreTable(
+                "test_prefix_t1_test_suffix",
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"}),
+                Collections.emptyList(),
+                Collections.singletonList("id"),
+                Collections.emptyMap());
+
+        final String topic1 = "prefix_suffix_0";
+        final String topic2 = "prefix_suffix_1";
+        boolean writeOne = false;
+        int fileCount = 2;
+        List<String> topics = Arrays.asList(topic1, topic2);
+        topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+        // ---------- Write the maxwell json into Kafka -------------------
+
+        for (int i = 0; i < topics.size(); i++) {
+            try {
+                writeRecordsToKafka(
+                        topics.get(i),
+                        readLines(
+                                "kafka/maxwell/database/prefixsuffix/topic"
+                                        + i
+                                        + "/maxwell-data-1.txt"));
+            } catch (Exception e) {
+                throw new Exception("Failed to write maxwell data to Kafka.", 
e);
+            }
+        }
+
+        // try synchronization
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", String.join(";", topics));
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .withTablePrefix("test_prefix_")
+                        .withTableSuffix("_test_suffix")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testTableAffixImpl(topics, writeOne, fileCount);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTableAffixOneTopic() throws Exception {
+        // create table t1
+        createFileStoreTable(
+                "test_prefix_t1_test_suffix",
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"}),
+                Collections.emptyList(),
+                Collections.singletonList("id"),
+                Collections.emptyMap());
+
+        final String topic1 = "prefix_suffix";
+        List<String> topics = Collections.singletonList(topic1);
+        boolean writeOne = true;
+        int fileCount = 2;
+        topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+        // ---------- Write the maxwell json into Kafka -------------------
+
+        for (int i = 0; i < fileCount; i++) {
+            try {
+                writeRecordsToKafka(
+                        topics.get(0),
+                        readLines(
+                                "kafka/maxwell/database/prefixsuffix/topic"
+                                        + i
+                                        + "/maxwell-data-1.txt"));
+            } catch (Exception e) {
+                throw new Exception("Failed to write maxwell data to Kafka.", 
e);
+            }
+        }
+
+        // try synchronization
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", String.join(";", topics));
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .withTablePrefix("test_prefix_")
+                        .withTableSuffix("_test_suffix")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testTableAffixImpl(topics, writeOne, fileCount);
+    }
+
+    private void testTableAffixImpl(List<String> topics, boolean writeOne, int 
fileCount)
+            throws Exception {
+        waitingTables("test_prefix_t1_test_suffix", 
"test_prefix_t2_test_suffix");
+
+        FileStoreTable table1 = 
getFileStoreTable("test_prefix_t1_test_suffix");
+        FileStoreTable table2 = 
getFileStoreTable("test_prefix_t2_test_suffix");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys1 = Collections.singletonList("id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                        "+I[102, car battery, 12V car battery, 8.1]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys2 = Collections.singletonList("id");
+        expected =
+                Arrays.asList(
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        for (int i = 0; i < fileCount; i++) {
+            try {
+                writeRecordsToKafka(
+                        writeOne ? topics.get(0) : topics.get(i),
+                        readLines(
+                                "kafka/maxwell/database/prefixsuffix/topic"
+                                        + i
+                                        + "/maxwell-data-2.txt"));
+            } catch (Exception e) {
+                throw new Exception("Failed to write maxwell data to Kafka.", 
e);
+            }
+        }
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"address"});
+        expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14, 
Beijing]",
+                        "+I[102, car battery, 12V car battery, 8.1, 
Shanghai]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"age"});
+        expected =
+                Arrays.asList(
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, 19]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIncludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "flink|paimon.+",
+                null,
+                Arrays.asList("flink", "paimon_1", "paimon_2"),
+                Collections.singletonList("ignore"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testExcludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                null,
+                "flink|paimon.+",
+                Collections.singletonList("ignore"),
+                Arrays.asList("flink", "paimon_1", "paimon_2"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIncludingAndExcludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "flink|paimon.+",
+                "paimon_1",
+                Arrays.asList("flink", "paimon_2"),
+                Arrays.asList("paimon_1", "ignore"));
+    }
+
+    private void includingAndExcludingTablesImpl(
+            @Nullable String includingTables,
+            @Nullable String excludingTables,
+            List<String> existedTables,
+            List<String> notExistedTables)
+            throws Exception {
+        final String topic1 = "include_exclude" + UUID.randomUUID();
+        List<String> topics = Collections.singletonList(topic1);
+        topics.forEach(topic -> createTestTopic(topic, 1, 1));
+
+        // ---------- Write the maxwell json into Kafka -------------------
+
+        try {
+            writeRecordsToKafka(
+                    topics.get(0),
+                    
readLines("kafka/maxwell/database/include/topic0/maxwell-data-1.txt"));
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        // try synchronization
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", String.join(";", topics));
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .includingTables(includingTables)
+                        .excludingTables(excludingTables)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        // check paimon tables
+        waitingTables(existedTables);
+        assertTableNotExists(notExistedTables);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
new file mode 100644
index 000000000..8bfae9493
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link KafkaMaxwellSyncTableActionITCase}. */
+public class KafkaMaxwellSyncTableActionITCase extends KafkaActionITCaseBase {
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolution() throws Exception {
+        runSingleTableSchemaEvolution("schemaevolution");
+    }
+
+    private void runSingleTableSchemaEvolution(String sourceDir) throws 
Exception {
+        final String topic = "schema_evolution";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines =
+                
readLines(String.format("kafka/maxwell/table/%s/maxwell-data-1.txt", 
sourceDir));
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testSchemaEvolutionImpl(topic, sourceDir);
+    }
+
+    private void testSchemaEvolutionImpl(String topic, String sourceDir) 
throws Exception {
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                        "+I[102, car battery, 12V car battery, 8.1]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        try {
+            writeRecordsToKafka(
+                    topic,
+                    readLines(
+                            
String.format("kafka/maxwell/table/%s/maxwell-data-2.txt", sourceDir)));
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"age"});
+        expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
+                        "+I[102, car battery, 12V car battery, 8.1, NULL]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, 18]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        try {
+            writeRecordsToKafka(
+                    topic,
+                    readLines(
+                            
String.format("kafka/maxwell/table/%s/maxwell-data-3.txt", sourceDir)));
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight", 
"age", "address"});
+        expected =
+                Arrays.asList(
+                        "+I[102, car battery, 12V car battery, 8.1, NULL, 
NULL]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, 18, NULL]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, 
NULL]",
+                        "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, 
Beijing]",
+                        "+I[107, rocks, box of assorted rocks, 5.3, NULL, 
NULL]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testNotSupportFormat() throws Exception {
+        final String topic = "not_support";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines = 
readLines("kafka/maxwell/table/schemaevolution/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "togg-json");
+        kafkaConfig.put("topic", topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+
+        assertThatThrownBy(action::run)
+                .satisfies(
+                        anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "This format: togg-json is not supported."));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testAssertSchemaCompatible() throws Exception {
+        final String topic = "assert_schema_compatible";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines = 
readLines("kafka/maxwell/table/schemaevolution/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+
+        // create an incompatible table
+        createFileStoreTable(
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
+                        new String[] {"k", "v1"}),
+                Collections.emptyList(),
+                Collections.singletonList("k"),
+                Collections.emptyMap());
+
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+
+        assertThatThrownBy(action::run)
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Paimon schema and source table schema are not 
compatible.\n"
+                                        + "Paimon fields are: [`k` STRING NOT 
NULL, `v1` STRING].\n"
+                                        + "Source table fields are: [`id` 
STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionSpecific() throws Exception {
+        final String topic = "start_up_specific";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines = 
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+        kafkaConfig.put("scan.startup.mode", "specific-offsets");
+        kafkaConfig.put("scan.startup.specific-offsets", 
"partition:0,offset:1");
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        // topic has two records we read two
+        List<String> expected =
+                Collections.singletonList("+I[102, car battery, 12V car 
battery, 8.1]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionLatest() throws Exception {
+        final String topic = "start_up_latest";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines = 
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+        kafkaConfig.put("scan.startup.mode", "latest-offset");
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        Thread.sleep(5000);
+        FileStoreTable table = getFileStoreTable(tableName);
+        try {
+            writeRecordsToKafka(
+                    topic, 
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        // topic has four records we read two
+        List<String> expected =
+                Arrays.asList(
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionTimestamp() throws Exception {
+        final String topic = "start_up_timestamp";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines = 
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+        kafkaConfig.put("scan.startup.mode", "timestamp");
+        kafkaConfig.put(
+                "scan.startup.timestamp-millis", 
String.valueOf(System.currentTimeMillis()));
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try {
+            writeRecordsToKafka(
+                    topic, 
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        // topic has four records we read two
+        List<String> expected =
+                Arrays.asList(
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionEarliest() throws Exception {
+        final String topic = "start_up_earliest";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines = 
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+        kafkaConfig.put("scan.startup.mode", "earliest-offset");
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try {
+            writeRecordsToKafka(
+                    topic, 
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        // topic has four records we read all
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                        "+I[102, car battery, 12V car battery, 8.1]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionGroup() throws Exception {
+        final String topic = "start_up_group";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the maxwell json into Kafka -------------------
+        List<String> lines = 
readLines("kafka/maxwell/table/startupmode/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+        kafkaConfig.put("scan.startup.mode", "group-offsets");
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try {
+            writeRecordsToKafka(
+                    topic, 
readLines("kafka/maxwell/table/startupmode/maxwell-data-2.txt"));
+        } catch (Exception e) {
+            throw new Exception("Failed to write maxwell data to Kafka.", e);
+        }
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        // topic has four records we read all
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                                "+I[102, car battery, 12V car battery, 8.1]",
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8]",
+                                "+I[104, hammer, 12oz carpenter's hammer, 
0.75]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testComputedColumn() throws Exception {
+        String topic = "computed_column";
+        createTestTopic(topic, 1, 1);
+
+        List<String> lines = 
readLines("kafka/maxwell/table/computedcolumn/maxwell-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write canal data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "maxwell-json");
+        kafkaConfig.put("topic", topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPartitionKeys("_year")
+                        .withPrimaryKeys("_id", "_year")
+                        .withComputedColumnArgs("_year=year(_date)")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.INT().notNull()
+                        },
+                        new String[] {"_id", "_date", "_year"});
+        waitForResult(
+                Collections.singletonList("+I[101, 2023-03-23, 2023]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Arrays.asList("_id", "_year"));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/include/topic0/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/include/topic0/maxwell-data-1.txt
new file mode 100644
index 000000000..b1c5d21e8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/include/topic0/maxwell-data-1.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database_affix","table":"paimon_1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database_affix","table":"paimon_2","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":8.1},"primary_key_columns": 
["id"]}
+{"database":"paimon_sync_database_affix","table":"ignore","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database_affix","table":"flink","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-1.txt
new file mode 100644
index 000000000..31d51f1e7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":8.1},"primary_key_columns": 
["id"]}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-2.txt
new file mode 100644
index 000000000..3c3a4a791
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic0/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14,"address":"Beijing"},"primary_key_columns": 
["id"]}
+{"database":"paimon_sync_database_affix","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car 
battery","weight":8.1,"address":"Shanghai"},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-1.txt
new file mode 100644
index 000000000..e9b80f4ab
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-2.txt
new file mode 100644
index 000000000..82e3b4e1f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/prefixsuffix/topic1/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8,"age":19},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75,"age":25},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-1.txt
new file mode 100644
index 000000000..81aab546a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":8.1},"primary_key_columns": 
["id"]}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-2.txt
new file mode 100644
index 000000000..85a7ea623
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic0/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8,"age":19},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t1","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75,"age":25},"primary_key_columns": ["id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-1.txt
new file mode 100644
index 000000000..a2a8efcae
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-2.txt
new file mode 100644
index 000000000..a1b96fff3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/database/schemaevolution/topic1/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8,"address":"Beijing"},"primary_key_columns": ["id"]}
+{"database":"paimon_sync_database","table":"t2","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75,"address":"Shanghai"},"primary_key_columns": 
["id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/computedcolumn/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/computedcolumn/maxwell-data-1.txt
new file mode 100644
index 000000000..d31d26ffb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/computedcolumn/maxwell-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"_id":101,"_date":"2023-03-23"},"primary_key_columns":
 ["_id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-1.txt
new file mode 100644
index 000000000..58a04a82a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":8.1},"primary_key_columns": 
["id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-2.txt
new file mode 100644
index 000000000..056ce73bc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8,"age":18},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75,"age":24},"primary_key_columns": ["id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-3.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-3.txt
new file mode 100644
index 000000000..c54ecdad2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-3.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz
 carpenter's 
hammer","weight":0.875,"address":"Shanghai"},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"delete","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"update","ts":1596684906,"xid":7216,"commit":true,"data":{"id":105,"name":"hammer","description":"14oz
 carpenter's 
hammer","weight":0.875,"address":"Beijing"},"old":{"address":"Shanghai"},"primary_key_columns":
 ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box
 of assorted rocks","weight":5.3},"primary_key_columns": ["id"]}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-1.txt
new file mode 100644
index 000000000..58a04a82a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small
 2-wheel scooter","weight":3.14},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car
 battery","description":"12V car battery","weight":8.1},"primary_key_columns": 
["id"]}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-2.txt
new file mode 100644
index 000000000..169508abb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/startupmode/maxwell-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":0.8},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}

Reply via email to