This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d7dd184e1 [cdc] Add AWS DMS CDC format support (#4433)
d7dd184e1 is described below

commit d7dd184e1cc5c5cae7135f3cbf084ea9f9b9d02a
Author: Lucian <[email protected]>
AuthorDate: Mon Nov 4 13:17:11 2024 +0800

    [cdc] Add AWS DMS CDC format support (#4433)
---
 .../flink/action/cdc/format/dms/DMSDataFormat.java |  34 +++++
 .../cdc/format/dms/DMSDataFormatFactory.java       |  38 +++++
 .../action/cdc/format/dms/DMSRecordParser.java     | 159 +++++++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../kafka/KafkaAWSDMSSyncDatabaseActionITCase.java |  85 +++++++++++
 .../kafka/KafkaAWSDMSSyncTableActionITCase.java    |  82 +++++++++++
 .../database/include/topic0/aws-dms-data-1.txt     |  22 +++
 .../prefixsuffix/topic0/aws-dms-data-1.txt         |  20 +++
 .../prefixsuffix/topic0/aws-dms-data-2.txt         |  20 +++
 .../prefixsuffix/topic1/aws-dms-data-1.txt         |  20 +++
 .../prefixsuffix/topic1/aws-dms-data-2.txt         |  20 +++
 .../schemaevolution/topic0/aws-dms-data-1.txt      |  20 +++
 .../schemaevolution/topic0/aws-dms-data-2.txt      |  20 +++
 .../schemaevolution/topic1/aws-dms-data-1.txt      |  20 +++
 .../schemaevolution/topic1/aws-dms-data-2.txt      |  20 +++
 .../table/computedcolumn/aws-dms-data-1.txt        |  19 +++
 .../table/schemaevolution/aws-dms-data-1.txt       |  20 +++
 .../table/schemaevolution/aws-dms-data-2.txt       |  20 +++
 .../table/schemaevolution/aws-dms-data-3.txt       |  22 +++
 .../table/schemaevolution/aws-dms-data-4.txt       |  20 +++
 .../aws-dms/table/startupmode/aws-dms-data-1.txt   |  20 +++
 .../aws-dms/table/startupmode/aws-dms-data-2.txt   |  20 +++
 .../aws-dms/table/watermark/aws-dms-data-1.txt     |  20 +++
 23 files changed, 742 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormat.java
new file mode 100644
index 000000000..43228fca4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.dms;
+
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
+
+/**
+ * Supports the message queue's AWS DMS json data format and provides 
definitions for the message
+ * queue's record json deserialization class and parsing class {@link 
DMSRecordParser}.
+ */
+public class DMSDataFormat extends AbstractJsonDataFormat {
+
+    @Override
+    protected RecordParserFactory parser() {
+        return DMSRecordParser::new;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormatFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormatFactory.java
new file mode 100644
index 000000000..0be1270e8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.dms;
+
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
+
+/** Factory to create {@link DMSDataFormat}. */
+public class DMSDataFormatFactory implements DataFormatFactory {
+
+    public static final String IDENTIFIER = "aws-dms-json";
+
+    @Override
+    public DataFormat create() {
+        return new DMSDataFormat();
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSRecordParser.java
new file mode 100644
index 000000000..8fc4808dd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSRecordParser.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.dms;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.Pair;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@code DMSRecordParser} class extends the abstract {@link 
AbstractJsonRecordParser} and is
+ * designed to parse records from AWS DMS's JSON change data capture (CDC) 
format. AWS DMS is a CDC
+ * solution for RDMS that captures row-level changes to database tables and 
outputs them in JSON
+ * format. This parser extracts relevant information from the DMS-JSON format 
and converts it into a
+ * list of {@link RichCdcMultiplexRecord} objects.
+ *
+ * <p>The class supports various database operations such as INSERT, UPDATE, 
and DELETE, and creates
+ * corresponding {@link RichCdcMultiplexRecord} objects to represent these 
changes.
+ *
+ * <p>Validation is performed to ensure that the JSON records contain all 
necessary fields, and the
+ * class also supports schema extraction for the Kafka topic.
+ */
+public class DMSRecordParser extends AbstractJsonRecordParser {
+
+    private static final String FIELD_DATA = "data";
+    private static final String FIELD_METADATA = "metadata";
+    private static final String FIELD_TYPE = "record-type";
+    private static final String FIELD_OP = "operation";
+    private static final String FIELD_DATABASE = "schema-name";
+    private static final String FIELD_TABLE = "table-name";
+
+    private static final String OP_LOAD = "load";
+    private static final String OP_INSERT = "insert";
+    private static final String OP_UPDATE = "update";
+    private static final String OP_DELETE = "delete";
+
+    private static final String BEFORE_PREFIX = "BI_";
+
+    public DMSRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
+        super(typeMapping, computedColumns);
+    }
+
+    @Override
+    protected @Nullable String getTableName() {
+        JsonNode metaNode = getAndCheck(FIELD_METADATA);
+        return metaNode.get(FIELD_TABLE).asText();
+    }
+
+    @Override
+    protected List<RichCdcMultiplexRecord> extractRecords() {
+        if (isDDL()) {
+            return Collections.emptyList();
+        }
+
+        JsonNode dataNode = getAndCheck(dataField());
+        String operation = getAndCheck(FIELD_METADATA).get(FIELD_OP).asText();
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+
+        switch (operation) {
+            case OP_LOAD:
+            case OP_INSERT:
+                processRecord(dataNode, RowKind.INSERT, records);
+                break;
+            case OP_UPDATE:
+                Pair<JsonNode, JsonNode> dataAndBeforeNodes = 
splitBeforeAndData(dataNode);
+                processRecord(dataAndBeforeNodes.getRight(), RowKind.DELETE, 
records);
+                processRecord(dataAndBeforeNodes.getLeft(), RowKind.INSERT, 
records);
+                break;
+            case OP_DELETE:
+                processRecord(dataNode, RowKind.DELETE, records);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown record 
operation: " + operation);
+        }
+
+        return records;
+    }
+
+    @Override
+    protected @Nullable String getDatabaseName() {
+        JsonNode metaNode = getAndCheck(FIELD_METADATA);
+        return metaNode.get(FIELD_DATABASE).asText();
+    }
+
+    @Override
+    protected String primaryField() {
+        return null;
+    }
+
+    @Override
+    protected String dataField() {
+        return FIELD_DATA;
+    }
+
+    @Override
+    protected String format() {
+        return "aws-dms-json";
+    }
+
+    @Override
+    protected boolean isDDL() {
+        String recordType = 
getAndCheck(FIELD_METADATA).get(FIELD_TYPE).asText();
+        return !"data".equals(recordType);
+    }
+
+    private Pair<JsonNode, JsonNode> splitBeforeAndData(JsonNode dataNode) {
+        JsonNode newDataNode = dataNode.deepCopy();
+        JsonNode beforeDataNode = dataNode.deepCopy();
+
+        Iterator<Map.Entry<String, JsonNode>> newDataFields = 
newDataNode.fields();
+        while (newDataFields.hasNext()) {
+            Map.Entry<String, JsonNode> next = newDataFields.next();
+            if (next.getKey().startsWith(BEFORE_PREFIX)) {
+                newDataFields.remove();
+            }
+        }
+
+        Iterator<Map.Entry<String, JsonNode>> beforeDataFields = 
beforeDataNode.fields();
+        while (beforeDataFields.hasNext()) {
+            Map.Entry<String, JsonNode> next = beforeDataFields.next();
+            if (next.getKey().startsWith(BEFORE_PREFIX)) {
+                String key = next.getKey().replaceFirst(BEFORE_PREFIX, "");
+                ((ObjectNode) beforeDataNode).set(key, next.getValue());
+                beforeDataFields.remove();
+            }
+        }
+
+        return Pair.of(newDataNode, beforeDataNode);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 9c4c4d0ac..17b8b29a2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -33,3 +33,4 @@ 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.json.JsonDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.ogg.OggDataFormatFactory
+org.apache.paimon.flink.action.cdc.format.dms.DMSDataFormatFactory
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncDatabaseActionITCase.java
new file mode 100644
index 000000000..da9f863dc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncDatabaseActionITCase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Map;
+
+/** IT cases for {@link KafkaSyncDatabaseAction}. */
+public class KafkaAWSDMSSyncDatabaseActionITCase extends 
KafkaSyncDatabaseActionITCase {
+
+    private static final String AWSDMS = "aws-dms";
+
+    @Override
+    protected KafkaSyncDatabaseActionBuilder syncDatabaseActionBuilder(
+            Map<String, String> kafkaConfig) {
+        KafkaSyncDatabaseActionBuilder builder = new 
KafkaSyncDatabaseActionBuilder(kafkaConfig);
+        builder.withPrimaryKeys("id");
+        return builder;
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionMultiTopic() throws Exception {
+        testSchemaEvolutionMultiTopic(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionOneTopic() throws Exception {
+        testSchemaEvolutionOneTopic(AWSDMS);
+    }
+
+    @Test
+    public void testTopicIsEmpty() {
+        testTopicIsEmpty(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTableAffixMultiTopic() throws Exception {
+        testTableAffixMultiTopic(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTableAffixOneTopic() throws Exception {
+        testTableAffixOneTopic(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIncludingTables() throws Exception {
+        testIncludingTables(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testExcludingTables() throws Exception {
+        testExcludingTables(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIncludingAndExcludingTables() throws Exception {
+        testIncludingAndExcludingTables(AWSDMS);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
new file mode 100644
index 000000000..02ac86cda
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/** IT cases for {@link KafkaSyncTableAction}. */
+public class KafkaAWSDMSSyncTableActionITCase extends 
KafkaSyncTableActionITCase {
+
+    private static final String AWSDMS = "aws-dms";
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolution() throws Exception {
+        runSingleTableSchemaEvolution("schemaevolution", AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testAssertSchemaCompatible() throws Exception {
+        testAssertSchemaCompatible(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionSpecific() throws Exception {
+        testStarUpOptionSpecific(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionLatest() throws Exception {
+        testStarUpOptionLatest(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionTimestamp() throws Exception {
+        testStarUpOptionTimestamp(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionEarliest() throws Exception {
+        testStarUpOptionEarliest(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStarUpOptionGroup() throws Exception {
+        testStarUpOptionGroup(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testComputedColumn() throws Exception {
+        testComputedColumn(AWSDMS);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testFieldValNullSyncTable() throws Exception {
+        testTableFiledValNull(AWSDMS);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/include/topic0/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/include/topic0/aws-dms-data-1.txt
new file mode 100644
index 000000000..d779b9fb4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/include/topic0/aws-dms-data-1.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"paimon_1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"paimon_2","transaction-id":670014899490}}
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"ignore","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"flink","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-1.txt
new file mode 100644
index 000000000..5ac7c5dbe
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-2.txt
new file mode 100644
index 000000000..56e1b53c1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14,"address":"Beijing"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.1,"address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-1.txt
new file mode 100644
index 000000000..a0351adb7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-2.txt
new file mode 100644
index 000000000..e59ef1c9a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8,"age":19},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75,"age":25},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-1.txt
new file mode 100644
index 000000000..5ac7c5dbe
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-2.txt
new file mode 100644
index 000000000..eeb254d71
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8,"age":19},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75,"age":25},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-1.txt
new file mode 100644
index 000000000..a0351adb7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-2.txt
new file mode 100644
index 000000000..a189a9d85
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8,"address":"Beijing"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75,"address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/computedcolumn/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/computedcolumn/aws-dms-data-1.txt
new file mode 100644
index 000000000..cf9112abc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/computedcolumn/aws-dms-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"_id":101,"_date":"2023-03-23"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-1.txt
new file mode 100644
index 000000000..42a00afe5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-2.txt
new file mode 100644
index 000000000..3ecfdab8b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8,"age":18},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75,"age":24},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-3.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-3.txt
new file mode 100644
index 000000000..04e18e1db
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-3.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":105,"name":"hammer","description":"14oz carpenter's 
hammer","weight":0.875,"address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"delete","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":105,"name":"hammer","description":"14oz carpenter's 
hammer","weight":0.875,"address":"Beijing","BI_id":105,"BI_name":"hammer","BI_description":"14oz
 carpenter's 
hammer","BI_weight":0.875,"BI_address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"update","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":107,"name":"rocks","description":"box of assorted 
rocks","weight":5.3},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-4.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-4.txt
new file mode 100644
index 000000000..e93607aed
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-4.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":null},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-1.txt
new file mode 100644
index 000000000..42a00afe5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-2.txt
new file mode 100644
index 000000000..70c0fb167
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill 
bits with sizes ranging from #40 to 
#3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's 
hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/watermark/aws-dms-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/watermark/aws-dms-data-1.txt
new file mode 100644
index 000000000..42a00afe5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/watermark/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel 
scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car 
battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file

Reply via email to