This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e3a2d27e [FLINK-31435] Introduce MySqlCtasAction to consume CDC 
events from MySQL
2e3a2d27e is described below

commit 2e3a2d27e549b31f81514b3219e9ecb653683e50
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 3 16:06:37 2023 +0800

    [FLINK-31435] Introduce MySqlCtasAction to consume CDC events from MySQL
    
    This closes #768.
---
 .../java/org/apache/paimon/utils/TypeUtils.java    |  20 +-
 .../org/apache/paimon/data/DataFormatTestUtil.java |   9 +-
 .../org/apache/paimon/casting/CastExecutors.java   |  32 +-
 paimon-flink/paimon-flink-common/pom.xml           |  15 +
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 244 ++++++++
 .../action/cdc/mysql/MySqlSyncTableAction.java     | 430 ++++++++++++++
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     | 215 +++++++
 .../flink/sink/cdc/CdcBucketStreamPartitioner.java |   1 -
 .../flink/sink/cdc/CdcParsingProcessFunction.java  |   2 -
 .../apache/paimon/flink/sink}/cdc/CdcRecord.java   |   2 +-
 .../apache/paimon/flink/sink}/cdc/EventParser.java |   2 +-
 .../apache/paimon/flink/sink/cdc/FlinkCdcSink.java |   1 -
 .../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java |   2 -
 .../sink/cdc/SchemaAwareStoreWriteOperator.java    |  32 +-
 .../sink/cdc/SchemaChangeProcessFunction.java      |  15 +-
 .../org/apache/paimon/flink/FileStoreITCase.java   |   2 +-
 .../paimon/flink/action/ActionITCaseBase.java      |   2 +-
 .../flink/action/cdc/mysql/MySqlContainer.java     | 180 ++++++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 633 +++++++++++++++++++++
 .../flink/action/cdc/mysql/MySqlVersion.java       |  38 +-
 .../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java  |   1 -
 .../cdc/SchemaAwareStoreWriteOperatorTest.java     |  47 +-
 .../apache/paimon/flink/sink/cdc/TestCdcEvent.java |   1 -
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |   2 -
 .../flink/sink/cdc/TestCdcSourceFunction.java      |   1 -
 .../apache/paimon/flink/util/AbstractTestBase.java |   4 -
 .../src/test/resources/mysql/my.cnf                |  65 +++
 .../src/test/resources/mysql/setup.sql             | 156 +++++
 28 files changed, 2071 insertions(+), 83 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 71fb9378b..b4ed5bded 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.LocalZonedTimestampType;
@@ -64,14 +65,27 @@ public class TypeUtils {
         switch (type.getTypeRoot()) {
             case CHAR:
             case VARCHAR:
+                int stringLength = DataTypeChecks.getLength(type);
+                if (s.length() > stringLength) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Length of type %s is %d, but casting 
result has a length of %d",
+                                    type, stringLength, s.length()));
+                }
                 return str;
             case BOOLEAN:
                 return toBoolean(str);
             case BINARY:
             case VARBINARY:
-                // this implementation does not match the new behavior of 
StringToBinaryCastRule,
-                // change this if needed
-                return s.getBytes();
+                int binaryLength = DataTypeChecks.getLength(type);
+                byte[] bytes = s.getBytes();
+                if (bytes.length > binaryLength) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Length of type %s is %d, but casting 
result has a length of %d",
+                                    type, binaryLength, bytes.length));
+                }
+                return bytes;
             case DECIMAL:
                 DecimalType decimalType = (DecimalType) type;
                 return Decimal.fromBigDecimal(
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java 
b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
index 42c4af6dc..11a08583e 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
@@ -21,6 +21,8 @@ import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.StringUtils;
 
+import java.util.Arrays;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Utils for testing data formats. */
@@ -38,7 +40,12 @@ public class DataFormatTestUtil {
             } else {
                 InternalRow.FieldGetter fieldGetter =
                         InternalRow.createFieldGetter(type.getTypeAt(i), i);
-                build.append(fieldGetter.getFieldOrNull(row));
+                Object field = fieldGetter.getFieldOrNull(row);
+                if (field instanceof byte[]) {
+                    build.append(Arrays.toString((byte[]) field));
+                } else {
+                    build.append(field);
+                }
             }
         }
         return build.toString();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java 
b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
index 22915adcc..a0810f1bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
@@ -21,13 +21,10 @@ package org.apache.paimon.casting;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.BinaryType;
-import org.apache.paimon.types.CharType;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarBinaryType;
-import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.DecimalUtils;
 import org.apache.paimon.utils.StringUtils;
@@ -134,7 +131,7 @@ public class CastExecutors {
             case VARCHAR:
                 if (outputType.getTypeRoot() == CHAR || 
outputType.getTypeRoot() == VARCHAR) {
                     final boolean targetCharType = outputType.getTypeRoot() == 
CHAR;
-                    final int targetLength = getStringLength(outputType);
+                    final int targetLength = 
DataTypeChecks.getLength(outputType);
                     return value -> {
                         BinaryString result;
                         String strVal = value.toString();
@@ -158,7 +155,7 @@ public class CastExecutors {
                         return result;
                     };
                 } else if (outputType.getTypeRoot() == VARBINARY) {
-                    final int targetLength = getBinaryLength(outputType);
+                    final int targetLength = 
DataTypeChecks.getLength(outputType);
                     return value -> {
                         byte[] byteArrayTerm = ((BinaryString) 
value).toBytes();
                         if (byteArrayTerm.length <= targetLength) {
@@ -170,9 +167,10 @@ public class CastExecutors {
                 }
                 return null;
             case BINARY:
+            case VARBINARY:
                 if (outputType.getTypeRoot() == BINARY || 
outputType.getTypeRoot() == VARBINARY) {
                     boolean targetBinaryType = outputType.getTypeRoot() == 
BINARY;
-                    final int targetLength = getBinaryLength(outputType);
+                    final int targetLength = 
DataTypeChecks.getLength(outputType);
                     return value -> {
                         byte[] bytes = (byte[]) value;
                         if (((byte[]) value).length == targetLength) {
@@ -238,24 +236,4 @@ public class CastExecutors {
     public static CastExecutor<?, ?> identityCastExecutor() {
         return IDENTITY_CAST_EXECUTOR;
     }
-
-    private static int getStringLength(DataType dataType) {
-        if (dataType instanceof CharType) {
-            return ((CharType) dataType).getLength();
-        } else if (dataType instanceof VarCharType) {
-            return ((VarCharType) dataType).getLength();
-        }
-
-        throw new IllegalArgumentException(String.format("Unsupported type 
%s", dataType));
-    }
-
-    private static int getBinaryLength(DataType dataType) {
-        if (dataType instanceof VarBinaryType) {
-            return ((VarBinaryType) dataType).getLength();
-        } else if (dataType instanceof BinaryType) {
-            return ((BinaryType) dataType).getLength();
-        }
-
-        throw new IllegalArgumentException(String.format("Unsupported type 
%s", dataType));
-    }
 }
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 7a174dc63..90a4260d7 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -35,6 +35,7 @@ under the License.
 
     <properties>
         <flink.version>1.17.0</flink.version>
+        <flink.cdc.version>2.3.0</flink.cdc.version>
         <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
     </properties>
 
@@ -67,6 +68,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-mysql-cdc</artifactId>
+            <version>${flink.cdc.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.ververica</groupId>
             <artifactId>frocksdbjni</artifactId>
@@ -241,6 +249,13 @@ under the License.
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
new file mode 100644
index 000000000..daf28fe75
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * {@link EventParser} for MySQL Debezium JSON.
+ *
+ * <p>Some implementation is referenced from <a
+ * 
href="https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java";>apache
+ * / doris-flink-connector</a>.
+ */
+public class MySqlDebeziumJsonEventParser implements EventParser<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
+    private static final String SCHEMA_CHANGE_REGEX =
+            
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP|MODIFY)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s\\(]+))?\\s*(\\((.*?)\\))?.*";
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private final Pattern schemaChangePattern =
+            Pattern.compile(SCHEMA_CHANGE_REGEX, Pattern.CASE_INSENSITIVE);
+
+    private final ZoneId serverTimeZone;
+
+    private JsonNode payload;
+    private Map<String, String> mySqlFieldTypes;
+    private Map<String, String> fieldClassNames;
+
+    public MySqlDebeziumJsonEventParser() {
+        this(ZoneId.systemDefault());
+    }
+
+    public MySqlDebeziumJsonEventParser(ZoneId serverTimeZone) {
+        this.serverTimeZone = serverTimeZone;
+    }
+
+    @Override
+    public void setRawEvent(String rawEvent) {
+        try {
+            JsonNode root = objectMapper.readValue(rawEvent, JsonNode.class);
+            JsonNode schema =
+                    Preconditions.checkNotNull(
+                            root.get("schema"),
+                            "MySqlDebeziumJsonEventParser only supports 
debezium JSON with schema. "
+                                    + "Please make sure that `includeSchema` 
is true "
+                                    + "in the 
JsonDebeziumDeserializationSchema you created");
+            payload = root.get("payload");
+
+            if (!isSchemaChange()) {
+                updateFieldTypes(schema);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void updateFieldTypes(JsonNode schema) {
+        mySqlFieldTypes = new HashMap<>();
+        fieldClassNames = new HashMap<>();
+        JsonNode arrayNode = schema.get("fields");
+        for (int i = 0; i < arrayNode.size(); i++) {
+            JsonNode elementNode = arrayNode.get(i);
+            String field = elementNode.get("field").asText();
+            if ("before".equals(field) || "after".equals(field)) {
+                JsonNode innerArrayNode = elementNode.get("fields");
+                for (int j = 0; j < innerArrayNode.size(); j++) {
+                    JsonNode innerElementNode = innerArrayNode.get(j);
+                    String fieldName = innerElementNode.get("field").asText();
+                    String fieldType = innerElementNode.get("type").asText();
+                    mySqlFieldTypes.put(fieldName, fieldType);
+                    if (innerElementNode.get("name") != null) {
+                        String className = 
innerElementNode.get("name").asText();
+                        fieldClassNames.put(fieldName, className);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean isSchemaChange() {
+        return payload.get("op") == null;
+    }
+
+    @Override
+    public List<SchemaChange> getSchemaChanges() {
+        JsonNode historyRecord = payload.get("historyRecord");
+        if (historyRecord == null) {
+            return Collections.emptyList();
+        }
+
+        JsonNode ddlNode;
+        try {
+            ddlNode = objectMapper.readTree(historyRecord.asText()).get("ddl");
+        } catch (Exception e) {
+            LOG.debug("Failed to parse history record for schema changes", e);
+            return Collections.emptyList();
+        }
+        if (ddlNode == null) {
+            return Collections.emptyList();
+        }
+        String ddl = ddlNode.asText();
+
+        Matcher matcher = schemaChangePattern.matcher(ddl);
+        if (matcher.find()) {
+            String op = matcher.group(1);
+            String column = matcher.group(3);
+            String type = matcher.group(5);
+            String len = matcher.group(7);
+            if ("add".equalsIgnoreCase(op)) {
+                return Collections.singletonList(
+                        SchemaChange.addColumn(column, 
MySqlTypeUtils.toDataType(type, len)));
+            } else if ("modify".equalsIgnoreCase(op)) {
+                return Collections.singletonList(
+                        SchemaChange.updateColumnType(
+                                column, MySqlTypeUtils.toDataType(type, len)));
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CdcRecord> getRecords() {
+        List<CdcRecord> records = new ArrayList<>();
+
+        Map<String, String> before = extractRow(payload.get("before"));
+        if (before.size() > 0) {
+            records.add(new CdcRecord(RowKind.DELETE, before));
+        }
+
+        Map<String, String> after = extractRow(payload.get("after"));
+        if (after.size() > 0) {
+            records.add(new CdcRecord(RowKind.INSERT, after));
+        }
+
+        return records;
+    }
+
+    private Map<String, String> extractRow(JsonNode recordRow) {
+        Map<String, String> recordMap =
+                objectMapper.convertValue(recordRow, new 
TypeReference<Map<String, String>>() {});
+        if (recordMap == null) {
+            return new HashMap<>();
+        }
+
+        for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
+            String fieldName = field.getKey();
+            String mySqlType = field.getValue();
+            if (recordMap.containsKey(fieldName)) {
+                String className = fieldClassNames.get(fieldName);
+                String oldValue = recordMap.get(fieldName);
+                String newValue = oldValue;
+
+                if (newValue == null) {
+                    continue;
+                }
+
+                if ("bytes".equals(mySqlType) && className == null) {
+                    // MySQL binary, varbinary, blob
+                    newValue = new 
String(Base64.getDecoder().decode(oldValue));
+                } else if ("bytes".equals(mySqlType)
+                        && 
"org.apache.kafka.connect.data.Decimal".equals(className)) {
+                    // MySQL numeric, fixed, decimal
+                    try {
+                        new BigDecimal(oldValue);
+                    } catch (NumberFormatException e) {
+                        throw new IllegalArgumentException(
+                                "Invalid big decimal value "
+                                        + oldValue
+                                        + ". Make sure that in the 
`customConverterConfigs` "
+                                        + "of the 
JsonDebeziumDeserializationSchema you created, set '"
+                                        + 
JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+                                        + "' to 'numeric'",
+                                e);
+                    }
+                } else if ("io.debezium.time.Date".equals(className)) {
+                    // MySQL date
+                    newValue = 
LocalDate.ofEpochDay(Integer.parseInt(oldValue)).toString();
+                } else if ("io.debezium.time.Timestamp".equals(className)) {
+                    // MySQL datetime
+                    newValue =
+                            Instant.ofEpochMilli(Long.parseLong(oldValue))
+                                    .atZone(serverTimeZone)
+                                    .toLocalDateTime()
+                                    .toString()
+                                    .replace('T', ' ');
+                } else if 
("io.debezium.time.ZonedTimestamp".equals(className)) {
+                    // MySQL timestamp
+                    newValue =
+                            Instant.parse(oldValue)
+                                    .atZone(serverTimeZone)
+                                    .toLocalDateTime()
+                                    .toString()
+                                    .replace('T', ' ');
+                }
+
+                recordMap.put(fieldName, newValue);
+            }
+        }
+
+        return recordMap;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
new file mode 100644
index 000000000..9a7710840
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link Action} which synchronize one or multiple MySQL tables into one 
Paimon table.
+ *
+ * <p>You should specify MySQL source table in {@code mySqlConfig}. See <a
+ * 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document
+ * of flink-cdc-connectors</a> for detailed keys and values.
+ *
+ * <p>If the specified Paimon table does not exist, this action will 
automatically create the table.
+ * Its schema will be derived from all specified MySQL tables. If the Paimon 
table already exists,
+ * its schema will be compared against the schema of all specified MySQL 
tables.
+ *
+ * <p>This action supports a limited number of schema changes. Unsupported 
schema changes will be
+ * ignored. Currently supported schema changes includes:
+ *
+ * <ul>
+ *   <li>Adding columns.
+ *   <li>Altering column types. More specifically,
+ *       <ul>
+ *         <li>altering from a string type (char, varchar, text) to another 
string type with longer
+ *             length,
+ *         <li>altering from a binary type (binary, varbinary, blob) to 
another binary type with
+ *             longer length,
+ *         <li>altering from an integer type (tinyint, smallint, int, bigint) 
to another integer
+ *             type with wider range,
+ *         <li>altering from a floating-point type (float, double) to another 
floating-point type
+ *             with wider range,
+ *       </ul>
+ *       are supported. Other type changes will cause exceptions.
+ * </ul>
+ */
+public class MySqlSyncTableAction implements Action {
+
+    private final Map<String, String> mySqlConfig;
+    private final String warehouse;
+    private final String database;
+    private final String table;
+    private final List<String> partitionKeys;
+    private final List<String> primaryKeys;
+    private final Map<String, String> paimonConfig;
+
+    MySqlSyncTableAction(
+            Map<String, String> mySqlConfig,
+            String warehouse,
+            String database,
+            String table,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            Map<String, String> paimonConfig) {
+        this.mySqlConfig = mySqlConfig;
+        this.warehouse = warehouse;
+        this.database = database;
+        this.table = table;
+        this.partitionKeys = partitionKeys;
+        this.primaryKeys = primaryKeys;
+        this.paimonConfig = paimonConfig;
+    }
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        build(env);
+        env.execute(String.format("MySQL CTAS: %s.%s", database, table));
+    }
+
+    public void build(StreamExecutionEnvironment env) throws Exception {
+        MySqlSource<String> source = buildSource();
+        MySqlSchema mySqlSchema =
+                getMySqlSchemaList().stream()
+                        .reduce(MySqlSchema::merge)
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                "No table satisfies the given 
database name and table name"));
+
+        Catalog catalog =
+                CatalogFactory.createCatalog(
+                        CatalogContext.create(
+                                new Options().set(CatalogOptions.WAREHOUSE, 
warehouse)));
+        catalog.createDatabase(database, true);
+
+        Identifier identifier = new Identifier(database, table);
+        FileStoreTable table;
+        try {
+            table = (FileStoreTable) catalog.getTable(identifier);
+            if (!schemaCompatible(table.schema(), mySqlSchema)) {
+                throw new IllegalArgumentException(
+                        "Paimon schema and MySQL schema are not compatible.\n"
+                                + "Paimon fields are: "
+                                + table.schema().fields()
+                                + ".\nMySQL fields are: "
+                                + mySqlSchema.fields);
+            }
+        } catch (Catalog.TableNotExistException e) {
+            Schema schema = buildSchema(mySqlSchema);
+            catalog.createTable(identifier, schema, false);
+            table = (FileStoreTable) catalog.getTable(identifier);
+        }
+
+        EventParser.Factory<String> parserFactory;
+        String serverTimeZone = mySqlConfig.get("server-time-zone");
+        if (serverTimeZone != null) {
+            parserFactory = () -> new 
MySqlDebeziumJsonEventParser(ZoneId.of(serverTimeZone));
+        } else {
+            parserFactory = MySqlDebeziumJsonEventParser::new;
+        }
+
+        FlinkCdcSinkBuilder<String> sinkBuilder =
+                new FlinkCdcSinkBuilder<String>()
+                        .withInput(
+                                env.fromSource(
+                                        source, 
WatermarkStrategy.noWatermarks(), "MySQL Source"))
+                        .withParserFactory(parserFactory)
+                        .withTable(table);
+        String sinkParallelism = 
paimonConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        if (sinkParallelism != null) {
+            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
+        }
+        sinkBuilder.build();
+    }
+
+    private MySqlSource<String> buildSource() {
+        MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
+
+        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key());
+        String tableName = 
mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key());
+        sourceBuilder
+                .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()))
+                
.port(Integer.parseInt(mySqlConfig.get(MySqlSourceOptions.PORT.key())))
+                .username(mySqlConfig.get(MySqlSourceOptions.USERNAME.key()))
+                .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))
+                .databaseList(databaseName)
+                .tableList(databaseName + "." + tableName);
+
+        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_ID.key()))
+                .ifPresent(sourceBuilder::serverId);
+        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE.key()))
+                .ifPresent(sourceBuilder::serverTimeZone);
+        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key()))
+                .ifPresent(size -> 
sourceBuilder.fetchSize(Integer.parseInt(size)));
+        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_TIMEOUT.key()))
+                .ifPresent(timeout -> 
sourceBuilder.connectTimeout(Duration.parse(timeout)));
+        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_MAX_RETRIES.key()))
+                .ifPresent(retries -> 
sourceBuilder.connectMaxRetries(Integer.parseInt(retries)));
+        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECTION_POOL_SIZE.key()))
+                .ifPresent(size -> 
sourceBuilder.connectionPoolSize(Integer.parseInt(size)));
+        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.HEARTBEAT_INTERVAL.key()))
+                .ifPresent(interval -> 
sourceBuilder.heartbeatInterval(Duration.parse(interval)));
+
+        String startupMode = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE.key());
+        // see
+        // 
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.initial());
+        } else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.earliest());
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.latest());
+        } else if ("specific-offset".equalsIgnoreCase(startupMode)) {
+            BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+            String file =
+                    
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key());
+            String pos = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key());
+            if (file != null && pos != null) {
+                offsetBuilder.setBinlogFilePosition(file, Long.parseLong(pos));
+            }
+            Optional.ofNullable(
+                            mySqlConfig.get(
+                                    
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key()))
+                    .ifPresent(offsetBuilder::setGtidSet);
+            Optional.ofNullable(
+                            mySqlConfig.get(
+                                    
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS
+                                            .key()))
+                    .ifPresent(
+                            skipEvents -> 
offsetBuilder.setSkipEvents(Long.parseLong(skipEvents)));
+            Optional.ofNullable(
+                            mySqlConfig.get(
+                                    
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS
+                                            .key()))
+                    .ifPresent(skipRows -> 
offsetBuilder.setSkipRows(Long.parseLong(skipRows)));
+            
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
+        } else if ("timestamp".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(
+                    StartupOptions.timestamp(
+                            Long.parseLong(
+                                    mySqlConfig.get(
+                                            
MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS
+                                                    .key()))));
+        }
+
+        Properties jdbcProperties = new Properties();
+        Properties debeziumProperties = new Properties();
+        for (Map.Entry<String, String> entry : mySqlConfig.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
+                
jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), 
value);
+            } else if 
(key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+                debeziumProperties.put(
+                        
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+            }
+        }
+        sourceBuilder.jdbcProperties(jdbcProperties);
+        sourceBuilder.debeziumProperties(debeziumProperties);
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(true, 
customConverterConfigs);
+        return 
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
+    }
+
+    private List<MySqlSchema> getMySqlSchemaList() throws Exception {
+        Pattern databasePattern =
+                
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key()));
+        Pattern tablePattern =
+                
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key()));
+        List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
+        try (Connection conn =
+                DriverManager.getConnection(
+                        String.format(
+                                "jdbc:mysql://%s:%s/",
+                                
mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()),
+                                
mySqlConfig.get(MySqlSourceOptions.PORT.key())),
+                        mySqlConfig.get(MySqlSourceOptions.USERNAME.key()),
+                        mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet schemas = metaData.getCatalogs()) {
+                while (schemas.next()) {
+                    String databaseName = schemas.getString("TABLE_CAT");
+                    Matcher databaseMatcher = 
databasePattern.matcher(databaseName);
+                    if (databaseMatcher.matches()) {
+                        try (ResultSet tables = 
metaData.getTables(databaseName, null, "%", null)) {
+                            while (tables.next()) {
+                                String tableName = 
tables.getString("TABLE_NAME");
+                                Matcher tableMatcher = 
tablePattern.matcher(tableName);
+                                if (tableMatcher.matches()) {
+                                    mySqlSchemaList.add(
+                                            new MySqlSchema(metaData, 
databaseName, tableName));
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return mySqlSchemaList;
+    }
+
+    private boolean schemaCompatible(TableSchema tableSchema, MySqlSchema 
mySqlSchema) {
+        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields.entrySet()) {
+            int idx = tableSchema.fieldNames().indexOf(entry.getKey());
+            if (idx < 0) {
+                return false;
+            }
+            DataType type = tableSchema.fields().get(idx).type();
+            if (!SchemaChangeProcessFunction.canConvert(entry.getValue(), 
type)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private Schema buildSchema(MySqlSchema mySqlSchema) {
+        Schema.Builder builder = Schema.newBuilder();
+        builder.options(paimonConfig);
+
+        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields.entrySet()) {
+            builder.column(entry.getKey(), entry.getValue());
+        }
+
+        if (primaryKeys.size() > 0) {
+            for (String key : primaryKeys) {
+                if (!mySqlSchema.fields.containsKey(key)) {
+                    throw new IllegalArgumentException(
+                            "Specified primary key " + key + " does not exist 
in MySQL tables");
+                }
+            }
+            builder.primaryKey(primaryKeys);
+        } else if (mySqlSchema.primaryKeys.size() > 0) {
+            builder.primaryKey(mySqlSchema.primaryKeys);
+        } else {
+            throw new IllegalArgumentException(
+                    "Primary keys are not specified. "
+                            + "Also, can't infer primary keys from MySQL table 
schemas because "
+                            + "MySQL tables have no primary keys or have 
different primary keys.");
+        }
+
+        if (partitionKeys.size() > 0) {
+            builder.partitionKeys(partitionKeys);
+        }
+
+        return builder.build();
+    }
+
+    private static class MySqlSchema {
+
+        private final String databaseName;
+        private final String tableName;
+
+        private final Map<String, DataType> fields;
+        private final List<String> primaryKeys;
+
+        private MySqlSchema(DatabaseMetaData metaData, String databaseName, 
String tableName)
+                throws Exception {
+            this.databaseName = databaseName;
+            this.tableName = tableName;
+
+            fields = new LinkedHashMap<>();
+            try (ResultSet rs = metaData.getColumns(null, databaseName, 
tableName, null)) {
+                while (rs.next()) {
+                    String fieldName = rs.getString("COLUMN_NAME");
+                    String fieldType = rs.getString("TYPE_NAME");
+                    Integer precision = rs.getInt("COLUMN_SIZE");
+                    if (rs.wasNull()) {
+                        precision = null;
+                    }
+                    Integer scale = rs.getInt("DECIMAL_DIGITS");
+                    if (rs.wasNull()) {
+                        scale = null;
+                    }
+                    fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType, 
precision, scale));
+                }
+            }
+
+            primaryKeys = new ArrayList<>();
+            try (ResultSet rs = metaData.getPrimaryKeys(null, databaseName, 
tableName)) {
+                while (rs.next()) {
+                    String fieldName = rs.getString("COLUMN_NAME");
+                    primaryKeys.add(fieldName);
+                }
+            }
+        }
+
+        private MySqlSchema merge(MySqlSchema other) {
+            for (Map.Entry<String, DataType> entry : other.fields.entrySet()) {
+                String fieldName = entry.getKey();
+                DataType newType = entry.getValue();
+                if (fields.containsKey(fieldName)) {
+                    DataType oldType = fields.get(fieldName);
+                    if (SchemaChangeProcessFunction.canConvert(oldType, 
newType)) {
+                        fields.put(fieldName, newType);
+                    } else if (SchemaChangeProcessFunction.canConvert(newType, 
oldType)) {
+                        // nothing to do
+                    } else {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Column %s have different types in 
table %s.%s and table %s.%s",
+                                        fieldName,
+                                        databaseName,
+                                        tableName,
+                                        other.databaseName,
+                                        other.tableName));
+                    }
+                } else {
+                    fields.put(fieldName, newType);
+                }
+            }
+            if (!primaryKeys.equals(other.primaryKeys)) {
+                primaryKeys.clear();
+            }
+            return this;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
new file mode 100644
index 000000000..d508852af
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converts from MySQL type to {@link DataType}.
+ *
+ * <p>Mostly referenced from <a
+ * 
href="https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTypeUtils.java";>ververica
+ * / flink-cdc-connectors</a>.
+ */
+public class MySqlTypeUtils {
+
+    // ------ MySQL Type ------
+    // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
+    private static final String BIT = "BIT";
+    private static final String TINYINT = "TINYINT";
+    private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED 
ZEROFILL";
+    private static final String SMALLINT = "SMALLINT";
+    private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT 
UNSIGNED ZEROFILL";
+    private static final String MEDIUMINT = "MEDIUMINT";
+    private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT 
UNSIGNED ZEROFILL";
+    private static final String INT = "INT";
+    private static final String INT_UNSIGNED = "INT UNSIGNED";
+    private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED 
ZEROFILL";
+    private static final String BIGINT = "BIGINT";
+    private static final String SERIAL = "SERIAL";
+    private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED 
ZEROFILL";
+    private static final String REAL = "REAL";
+    private static final String REAL_UNSIGNED = "REAL UNSIGNED";
+    private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED 
ZEROFILL";
+    private static final String FLOAT = "FLOAT";
+    private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED 
ZEROFILL";
+    private static final String DOUBLE = "DOUBLE";
+    private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED 
ZEROFILL";
+    private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION 
UNSIGNED";
+    private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
+            "DOUBLE PRECISION UNSIGNED ZEROFILL";
+    private static final String NUMERIC = "NUMERIC";
+    private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
+    private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED 
ZEROFILL";
+    private static final String FIXED = "FIXED";
+    private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
+    private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED 
ZEROFILL";
+    private static final String DECIMAL = "DECIMAL";
+    private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED 
ZEROFILL";
+    private static final String CHAR = "CHAR";
+    private static final String VARCHAR = "VARCHAR";
+    private static final String TINYTEXT = "TINYTEXT";
+    private static final String MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String TEXT = "TEXT";
+    private static final String LONGTEXT = "LONGTEXT";
+    private static final String DATE = "DATE";
+    private static final String TIME = "TIME";
+    private static final String DATETIME = "DATETIME";
+    private static final String TIMESTAMP = "TIMESTAMP";
+    private static final String YEAR = "YEAR";
+    private static final String BINARY = "BINARY";
+    private static final String VARBINARY = "VARBINARY";
+    private static final String TINYBLOB = "TINYBLOB";
+    private static final String MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String BLOB = "BLOB";
+    private static final String LONGBLOB = "LONGBLOB";
+    private static final String JSON = "JSON";
+    private static final String SET = "SET";
+    private static final String ENUM = "ENUM";
+    private static final String GEOMETRY = "GEOMETRY";
+    private static final String UNKNOWN = "UNKNOWN";
+
+    // This length is from JDBC.
+    // It returns the number of characters when converting this timestamp to 
string.
+    // The base length of a timestamp is 19, for example "2023-03-23 17:20:00".
+    private static final int JDBC_TIMESTAMP_BASE_LENGTH = 19;
+
+    public static DataType toDataType(String type, String params) {
+        List<Integer> paramList = new ArrayList<>();
+        if (params != null) {
+            for (String s : params.split(",")) {
+                paramList.add(Integer.parseInt(s.trim()));
+            }
+        }
+        return toDataType(
+                type,
+                paramList.size() > 0 ? paramList.get(0) : null,
+                paramList.size() > 1 ? paramList.get(1) : null);
+    }
+
+    public static DataType toDataType(
+            String type, @Nullable Integer length, @Nullable Integer scale) {
+        switch (type.toUpperCase()) {
+            case BIT:
+                return DataTypes.BOOLEAN();
+            case TINYINT:
+                // MySQL haven't boolean type, it uses tinyint(1) to 
represents boolean type
+                // user should not use tinyint(1) to store number although 
jdbc url parameter
+                // tinyInt1isBit=false can help change the return value, it's 
not a general way
+                // btw: mybatis and mysql-connector-java map tinyint(1) to 
boolean by default
+                return length != null && length == 1 ? DataTypes.BOOLEAN() : 
DataTypes.TINYINT();
+            case TINYINT_UNSIGNED:
+            case TINYINT_UNSIGNED_ZEROFILL:
+            case SMALLINT:
+                return DataTypes.SMALLINT();
+            case SMALLINT_UNSIGNED:
+            case SMALLINT_UNSIGNED_ZEROFILL:
+            case INT:
+            case MEDIUMINT:
+                return DataTypes.INT();
+            case INT_UNSIGNED:
+            case INT_UNSIGNED_ZEROFILL:
+            case MEDIUMINT_UNSIGNED:
+            case MEDIUMINT_UNSIGNED_ZEROFILL:
+            case BIGINT:
+                return DataTypes.BIGINT();
+            case BIGINT_UNSIGNED:
+            case BIGINT_UNSIGNED_ZEROFILL:
+            case SERIAL:
+                return DataTypes.DECIMAL(20, 0);
+            case FLOAT:
+            case FLOAT_UNSIGNED:
+            case FLOAT_UNSIGNED_ZEROFILL:
+                return DataTypes.FLOAT();
+            case REAL:
+            case REAL_UNSIGNED:
+            case REAL_UNSIGNED_ZEROFILL:
+            case DOUBLE:
+            case DOUBLE_UNSIGNED:
+            case DOUBLE_UNSIGNED_ZEROFILL:
+            case DOUBLE_PRECISION:
+            case DOUBLE_PRECISION_UNSIGNED:
+            case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
+                return DataTypes.DOUBLE();
+            case NUMERIC:
+            case NUMERIC_UNSIGNED:
+            case NUMERIC_UNSIGNED_ZEROFILL:
+            case FIXED:
+            case FIXED_UNSIGNED:
+            case FIXED_UNSIGNED_ZEROFILL:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+            case DECIMAL_UNSIGNED_ZEROFILL:
+                return length != null && length <= 38
+                        ? DataTypes.DECIMAL(length, scale != null && scale >= 
0 ? scale : 0)
+                        : DataTypes.STRING();
+            case DATE:
+                return DataTypes.DATE();
+            case DATETIME:
+            case TIMESTAMP:
+                if (length == null) {
+                    return DataTypes.TIMESTAMP();
+                } else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
+                    if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
+                        // Timestamp with a fraction of seconds.
+                        // For example "2023-03-23 17:20:00.01".
+                        // The decimal point will occupy 1 character.
+                        return DataTypes.TIMESTAMP(length - 
JDBC_TIMESTAMP_BASE_LENGTH - 1);
+                    } else {
+                        return DataTypes.TIMESTAMP(0);
+                    }
+                } else if (length >= 0 && length <= 
TimestampType.MAX_PRECISION) {
+                    return DataTypes.TIMESTAMP(length);
+                } else {
+                    return DataTypes.TIMESTAMP();
+                }
+            case CHAR:
+                return DataTypes.CHAR(Preconditions.checkNotNull(length));
+            case VARCHAR:
+                return DataTypes.VARCHAR(Preconditions.checkNotNull(length));
+            case TEXT:
+                return DataTypes.STRING();
+            case BINARY:
+                return DataTypes.BINARY(Preconditions.checkNotNull(length));
+            case VARBINARY:
+                return DataTypes.VARBINARY(Preconditions.checkNotNull(length));
+            case BLOB:
+                return DataTypes.BYTES();
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Don't support MySQL type '%s' yet.", 
type));
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
index 3b4699c84..69b64740f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.data.BinaryRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index 223b709ec..357c9996d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
-import org.apache.paimon.cdc.EventParser;
 import org.apache.paimon.schema.SchemaChange;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
similarity index 97%
rename from paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 4e80002af..e6692c072 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.cdc;
+package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.types.RowKind;
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
similarity index 97%
copy from paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index 46c744699..9e575a338 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.cdc;
+package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.schema.SchemaChange;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 7981fec6e..24ddd9e56 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.CommittableStateManager;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
index 43e940b3b..acee7b911 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
-import org.apache.paimon.cdc.EventParser;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.sink.LogSinkFunction;
 import org.apache.paimon.operation.Lock;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
index 6988f2782..c637d2f73 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.sink.AbstractStoreWriteOperator;
 import org.apache.paimon.flink.sink.LogSinkFunction;
@@ -104,23 +103,38 @@ public class SchemaAwareStoreWriteOperator extends 
AbstractStoreWriteOperator<Cd
     private Map<String, Object> tryConvert(Map<String, String> fields) {
         Map<String, Object> converted = new HashMap<>();
         TableSchema schema = table.schema();
+
         for (Map.Entry<String, String> field : fields.entrySet()) {
             String key = field.getKey();
             String value = field.getValue();
+
             int idx = schema.fieldNames().indexOf(key);
             if (idx < 0) {
+                LOG.info("Field " + key + " not found. Waiting for schema 
update.");
                 return null;
             }
-            DataType type = schema.fields().get(idx).type();
-            // TODO TypeUtils.castFromString cannot deal with complex types 
like arrays and maps.
-            //  Change type of CdcRecord#field if needed.
-            try {
-                converted.put(key, TypeUtils.castFromString(value, type));
-            } catch (Exception e) {
-                LOG.debug("Failed to convert value " + value + " to type " + 
type, e);
-                return null;
+
+            if (value == null) {
+                converted.put(key, null);
+            } else {
+                DataType type = schema.fields().get(idx).type();
+                // TODO TypeUtils.castFromString cannot deal with complex 
types like arrays and
+                //  maps. Change type of CdcRecord#field if needed.
+                try {
+                    converted.put(key, TypeUtils.castFromString(value, type));
+                } catch (Exception e) {
+                    LOG.info(
+                            "Failed to convert value "
+                                    + value
+                                    + " to type "
+                                    + type
+                                    + ". Waiting for schema update.",
+                            e);
+                    return null;
+                }
             }
         }
+
         return converted;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
index 85a9f080e..d8ad51d38 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.utils.Preconditions;
 
@@ -86,7 +87,7 @@ public class SchemaChangeProcessFunction extends 
ProcessFunction<SchemaChange, V
                             + " does not exist in table. This is unexpected.");
             DataType oldType = schema.fields().get(idx).type();
             DataType newType = updateColumnType.newDataType();
-            if (checkTypeConversion(oldType, newType)) {
+            if (canConvert(oldType, newType)) {
                 schemaManager.commitChanges(schemaChange);
             } else {
                 throw new UnsupportedOperationException(
@@ -109,6 +110,8 @@ public class SchemaChangeProcessFunction extends 
ProcessFunction<SchemaChange, V
 
     private static final List<DataTypeRoot> STRING_TYPES =
             Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
+    private static final List<DataTypeRoot> BINARY_TYPES =
+            Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
     private static final List<DataTypeRoot> INTEGER_TYPES =
             Arrays.asList(
                     DataTypeRoot.TINYINT,
@@ -118,11 +121,17 @@ public class SchemaChangeProcessFunction extends 
ProcessFunction<SchemaChange, V
     private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
             Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
 
-    private boolean checkTypeConversion(DataType oldType, DataType newType) {
+    public static boolean canConvert(DataType oldType, DataType newType) {
         int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
         int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
         if (oldIdx >= 0 && newIdx >= 0) {
-            return true;
+            return DataTypeChecks.getLength(oldType) <= 
DataTypeChecks.getLength(newType);
+        }
+
+        oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
+        newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
+        if (oldIdx >= 0 && newIdx >= 0) {
+            return DataTypeChecks.getLength(oldType) <= 
DataTypeChecks.getLength(newType);
         }
 
         oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 8b5fba73a..1e17967e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -92,7 +92,7 @@ public class FileStoreITCase extends AbstractTestBase {
             new RowType(
                     Arrays.asList(
                             new RowType.RowField("v", new IntType()),
-                            new RowType.RowField("p", new VarCharType()),
+                            new RowType.RowField("p", new VarCharType(10)),
                             // rename key
                             new RowType.RowField("_k", new IntType())));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index ee1d010a3..74b8bc374 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -47,7 +47,7 @@ import java.util.Map;
 import java.util.UUID;
 
 /** {@link Action} test base. */
-public class ActionITCaseBase extends AbstractTestBase {
+public abstract class ActionITCaseBase extends AbstractTestBase {
 
     protected String warehouse;
     protected String database;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
new file mode 100644
index 000000000..023f1d4e7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Docker container for MySQL. The difference between this class and {@link
+ * org.testcontainers.containers.MySQLContainer} is that TC MySQLContainer has 
problems when
+ * overriding mysql conf file, i.e. my.cnf.
+ *
+ * <p>Copied from <a
+ * 
href="https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlContainer.java";>ververica
+ * / flink-cdc-connectors</a>.
+ */
+@SuppressWarnings("rawtypes")
+public class MySqlContainer extends JdbcDatabaseContainer {
+
+    public static final String IMAGE = "mysql";
+    public static final Integer MYSQL_PORT = 3306;
+
+    private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+    private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
+    private static final String MYSQL_ROOT_USER = "root";
+
+    private String databaseName = "test";
+    private String username = "test";
+    private String password = "test";
+
+    public MySqlContainer(MySqlVersion version) {
+        super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
+        addExposedPort(MYSQL_PORT);
+    }
+
+    @Override
+    protected Set<Integer> getLivenessCheckPorts() {
+        return new HashSet<>(getMappedPort(MYSQL_PORT));
+    }
+
+    @Override
+    protected void configure() {
+        optionallyMapResourceParameterAsVolume(
+                MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", 
"mysql-default-conf");
+
+        if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+            optionallyMapResourceParameterAsVolume(
+                    SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", 
"N/A");
+        }
+
+        addEnv("MYSQL_DATABASE", databaseName);
+        addEnv("MYSQL_USER", username);
+        if (password != null && !password.isEmpty()) {
+            addEnv("MYSQL_PASSWORD", password);
+            addEnv("MYSQL_ROOT_PASSWORD", password);
+        } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
+            addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
+        } else {
+            throw new ContainerLaunchException(
+                    "Empty password can be used only with the root user");
+        }
+        setStartupAttempts(3);
+    }
+
+    @Override
+    public String getDriverClassName() {
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            return "com.mysql.cj.jdbc.Driver";
+        } catch (ClassNotFoundException e) {
+            return "com.mysql.jdbc.Driver";
+        }
+    }
+
+    public String getJdbcUrl(String databaseName) {
+        String additionalUrlParams = constructUrlParameters("?", "&");
+        return "jdbc:mysql://"
+                + getHost()
+                + ":"
+                + getDatabasePort()
+                + "/"
+                + databaseName
+                + additionalUrlParams;
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return getJdbcUrl(databaseName);
+    }
+
+    public int getDatabasePort() {
+        return getMappedPort(MYSQL_PORT);
+    }
+
+    @Override
+    protected String constructUrlForConnection(String queryString) {
+        String url = super.constructUrlForConnection(queryString);
+
+        if (!url.contains("useSSL=")) {
+            String separator = url.contains("?") ? "&" : "?";
+            url = url + separator + "useSSL=false";
+        }
+
+        if (!url.contains("allowPublicKeyRetrieval=")) {
+            url = url + "&allowPublicKeyRetrieval=true";
+        }
+
+        return url;
+    }
+
+    @Override
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    @Override
+    public String getUsername() {
+        return username;
+    }
+
+    @Override
+    public String getPassword() {
+        return password;
+    }
+
+    @Override
+    protected String getTestQueryString() {
+        return "SELECT 1";
+    }
+
+    @SuppressWarnings("unchecked")
+    public MySqlContainer withConfigurationOverride(String s) {
+        parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public MySqlContainer withSetupSQL(String sqlPath) {
+        parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withDatabaseName(final String databaseName) {
+        this.databaseName = databaseName;
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withUsername(final String username) {
+        this.username = username;
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withPassword(final String password) {
+        this.password = password;
+        return this;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
new file mode 100644
index 000000000..9769a1672
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** IT cases for {@link MySqlSyncTableAction}. */
+public class MySqlSyncTableActionITCase extends ActionITCaseBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSyncTableActionITCase.class);
+
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V5_7);
+    private static final String USER = "paimonuser";
+    private static final String PASSWORD = "paimonpw";
+    private static final String DATABASE_NAME = "paimon_test";
+
+    @BeforeAll
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @AfterAll
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        MYSQL_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return (MySqlContainer)
+                new MySqlContainer(version)
+                        .withConfigurationOverride("mysql/my.cnf")
+                        .withSetupSQL("mysql/setup.sql")
+                        .withUsername(USER)
+                        .withPassword(PASSWORD)
+                        .withDatabaseName(DATABASE_NAME)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolution() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "schema_evolution_\\d+");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> paimonConfig = new HashMap<>();
+        paimonConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        paimonConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) 
+ 1));
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "_id"),
+                        paimonConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        String.format(
+                                "jdbc:mysql://%s:%s/",
+                                MYSQL_CONTAINER.getHost(), 
MYSQL_CONTAINER.getDatabasePort()),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testSchemaEvolutionImpl(statement);
+            }
+        }
+    }
+
+    private void testSchemaEvolutionImpl(Statement statement) throws Exception 
{
+        FileStoreTable table = getFileStoreTable();
+        statement.executeUpdate("USE paimon_test");
+
+        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 
'one')");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4, 
'four')");
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"pt", "_id", "v1"});
+        List<String> primaryKeys = Arrays.asList("pt", "_id");
+        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, 
two]", "+I[2, 4, four]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 
INT");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30), 
(1, 5, 'five', 50)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2 
INT");
+        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6, 
'six', 60)");
+        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'second' 
WHERE _id = 2");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.INT()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL]",
+                        "+I[1, 2, second, NULL]",
+                        "+I[2, 3, three, 30]",
+                        "+I[2, 4, four, NULL]",
+                        "+I[1, 5, five, 50]",
+                        "+I[1, 6, six, 60]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN 
v2 BIGINT");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 
70000000000)");
+        statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 
30000000000 WHERE _id = 3");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN 
v2 BIGINT");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight', 
80000000000)");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL]",
+                        "+I[1, 2, second, NULL]",
+                        "+I[2, 3, three, 30000000000]",
+                        "+I[2, 4, four, NULL]",
+                        "+I[1, 5, five, 50]",
+                        "+I[1, 6, six, 60]",
+                        "+I[2, 7, seven, 70000000000]",
+                        "+I[2, 8, eight, 80000000000]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v3 
NUMERIC(8, 3)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v4 
VARBINARY(10)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v5 
FLOAT");
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN 
v1 VARCHAR(20)");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_1 VALUES (1, 9, 'nine', 
90000000000, 99999.999, 'nine.bin', 9.9)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v3 
NUMERIC(8, 3)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 
VARBINARY(10)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 
FLOAT");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN 
v1 VARCHAR(20)");
+        statement.executeUpdate(
+                "UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE 
_id = 8");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(20),
+                            DataTypes.BIGINT(),
+                            DataTypes.DECIMAL(8, 3),
+                            DataTypes.VARBINARY(10),
+                            DataTypes.FLOAT()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2", "v3", "v4", 
"v5"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL, NULL, NULL, NULL]",
+                        "+I[1, 2, second, NULL, NULL, NULL, NULL]",
+                        "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
+                        "+I[2, 4, four, NULL, NULL, NULL, NULL]",
+                        "+I[1, 5, five, 50, NULL, NULL, NULL]",
+                        "+I[1, 6, six, 60, NULL, NULL, NULL]",
+                        "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
+                        "+I[2, 8, very long string, 80000000000, NULL, NULL, 
NULL]",
+                        "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 
110, 101, 46, 98, 105, 110], 9.9]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN 
v4 VARBINARY(20)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN 
v5 DOUBLE");
+        statement.executeUpdate(
+                "UPDATE schema_evolution_1 SET v4 = 'nine.bin.long', v5 = 
9.00000000009 WHERE _id = 9");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN 
v4 VARBINARY(20)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN 
v5 DOUBLE");
+        statement.executeUpdate(
+                "UPDATE schema_evolution_2 SET v4 = 'four.bin.long', v5 = 
4.00000000004 WHERE _id = 4");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(20),
+                            DataTypes.BIGINT(),
+                            DataTypes.DECIMAL(8, 3),
+                            DataTypes.VARBINARY(20),
+                            DataTypes.DOUBLE()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2", "v3", "v4", 
"v5"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL, NULL, NULL, NULL]",
+                        "+I[1, 2, second, NULL, NULL, NULL, NULL]",
+                        "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
+                        "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 
98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]",
+                        "+I[1, 5, five, 50, NULL, NULL, NULL]",
+                        "+I[1, 6, six, 60, NULL, NULL, NULL]",
+                        "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
+                        "+I[2, 8, very long string, 80000000000, NULL, NULL, 
NULL]",
+                        "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 
110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testAllTypes() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "all_types_table");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>());
+        action.build(env);
+        env.executeAsync();
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), // _id
+                            DataTypes.BOOLEAN(), // _boolean
+                            DataTypes.TINYINT(), // _tinyint
+                            DataTypes.SMALLINT(), // _tinyint_unsigned
+                            DataTypes.SMALLINT(), // _tinyint_unsigned_zerofill
+                            DataTypes.SMALLINT(), // _smallint
+                            DataTypes.INT(), // _smallint_unsigned
+                            DataTypes.INT(), // _smallint_unsigned_zerofill
+                            DataTypes.INT(), // _mediumint
+                            DataTypes.BIGINT(), // _mediumint_unsigned
+                            DataTypes.BIGINT(), // _mediumint_unsigned_zerofill
+                            DataTypes.INT(), // _int
+                            DataTypes.BIGINT(), // _int_unsigned
+                            DataTypes.BIGINT(), // _int_unsigned_zerofill
+                            DataTypes.BIGINT(), // _bigint
+                            DataTypes.DECIMAL(20, 0), // _bigint_unsigned
+                            DataTypes.DECIMAL(20, 0), // 
_bigint_unsigned_zerofill
+                            DataTypes.DECIMAL(20, 0), // _serial
+                            DataTypes.FLOAT(), // _float
+                            DataTypes.FLOAT(), // _float_unsigned
+                            DataTypes.FLOAT(), // _float_unsigned_zerofill
+                            DataTypes.DOUBLE(), // _real
+                            DataTypes.DOUBLE(), // _real_unsigned
+                            DataTypes.DOUBLE(), // _real_unsigned_zerofill
+                            DataTypes.DOUBLE(), // _double
+                            DataTypes.DOUBLE(), // _double_unsigned
+                            DataTypes.DOUBLE(), // _double_unsigned_zerofill
+                            DataTypes.DOUBLE(), // _double_precision
+                            DataTypes.DOUBLE(), // _double_precision_unsigned
+                            DataTypes.DOUBLE(), // 
_double_precision_unsigned_zerofill
+                            DataTypes.DECIMAL(8, 3), // _numeric
+                            DataTypes.DECIMAL(8, 3), // _numeric_unsigned
+                            DataTypes.DECIMAL(8, 3), // 
_numeric_unsigned_zerofill
+                            DataTypes.STRING(), // _fixed
+                            DataTypes.STRING(), // _fixed_unsigned
+                            DataTypes.STRING(), // _fixed_unsigned_zerofill
+                            DataTypes.DECIMAL(8, 0), // _decimal
+                            DataTypes.DECIMAL(8, 0), // _decimal_unsigned
+                            DataTypes.DECIMAL(8, 0), // 
_decimal_unsigned_zerofill
+                            DataTypes.DATE(), // _date
+                            DataTypes.TIMESTAMP(0), // _datetime
+                            DataTypes.TIMESTAMP(6), // _timestamp
+                            DataTypes.CHAR(10), // _char
+                            DataTypes.VARCHAR(20), // _varchar
+                            DataTypes.STRING(), // _text
+                            DataTypes.BINARY(10), // _bin
+                            DataTypes.VARBINARY(20), // _varbin
+                            DataTypes.BYTES() // _blob
+                        },
+                        new String[] {
+                            "_id",
+                            "_boolean",
+                            "_tinyint",
+                            "_tinyint_unsigned",
+                            "_tinyint_unsigned_zerofill",
+                            "_smallint",
+                            "_smallint_unsigned",
+                            "_smallint_unsigned_zerofill",
+                            "_mediumint",
+                            "_mediumint_unsigned",
+                            "_mediumint_unsigned_zerofill",
+                            "_int",
+                            "_int_unsigned",
+                            "_int_unsigned_zerofill",
+                            "_bigint",
+                            "_bigint_unsigned",
+                            "_bigint_unsigned_zerofill",
+                            "_serial",
+                            "_float",
+                            "_float_unsigned",
+                            "_float_unsigned_zerofill",
+                            "_real",
+                            "_real_unsigned",
+                            "_real_unsigned_zerofill",
+                            "_double",
+                            "_double_unsigned",
+                            "_double_unsigned_zerofill",
+                            "_double_precision",
+                            "_double_precision_unsigned",
+                            "_double_precision_unsigned_zerofill",
+                            "_numeric",
+                            "_numeric_unsigned",
+                            "_numeric_unsigned_zerofill",
+                            "_fixed",
+                            "_fixed_unsigned",
+                            "_fixed_unsigned_zerofill",
+                            "_decimal",
+                            "_decimal_unsigned",
+                            "_decimal_unsigned_zerofill",
+                            "_date",
+                            "_datetime",
+                            "_timestamp",
+                            "_char",
+                            "_varchar",
+                            "_text",
+                            "_bin",
+                            "_varbin",
+                            "_blob"
+                        });
+        FileStoreTable table = getFileStoreTable();
+        List<String> expected =
+                Arrays.asList(
+                        "+I["
+                                + "1, true, 1, 2, 3, "
+                                + "1000, 2000, 3000, "
+                                + "100000, 200000, 300000, "
+                                + "1000000, 2000000, 3000000, "
+                                + "10000000000, 20000000000, 30000000000, 
40000000000, "
+                                + "1.5, 2.5, 3.5, "
+                                + "1.000001, 2.000002, 3.000003, "
+                                + "1.000011, 2.000022, 3.000033, "
+                                + "1.000111, 2.000222, 3.000333, "
+                                + "12345.110, 12345.220, 12345.330, "
+                                + "1.2345678987654322E32, 
1.2345678987654322E32, 1.2345678987654322E32, "
+                                + "11111, 22222, 33333, "
+                                + "19439, 2023-03-23T14:30:05, 
2023-03-23T15:00:10.123456, "
+                                + "Paimon, Apache Paimon, Apache Paimon MySQL 
Test Data, "
+                                + "[98, 121, 116, 101, 115, 0, 0, 0, 0, 0], "
+                                + "[109, 111, 114, 101, 32, 98, 121, 116, 101, 
115], "
+                                + "[118, 101, 114, 121, 32, 108, 111, 110, 
103, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97]"
+                                + "]",
+                        "+I["
+                                + "2, NULL, NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, 50000000000, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL"
+                                + "]");
+        waitForResult(expected, table, rowType, 
Collections.singletonList("_id"));
+    }
+
+    @Test
+    public void testIncompatibleMySqlTable() {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "incompatible_field_\\d+");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.singletonList("_id"),
+                        new HashMap<>());
+
+        IllegalArgumentException e =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> action.build(env),
+                        "Expecting IllegalArgumentException");
+        assertThat(e)
+                .hasMessage(
+                        "Column v1 have different types in table "
+                                + DATABASE_NAME
+                                + ".incompatible_field_1 and table "
+                                + DATABASE_NAME
+                                + ".incompatible_field_2");
+    }
+
+    @Test
+    public void testIncompatiblePaimonTable() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "incompatible_pk_\\d+");
+
+        createFileStoreTable(
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"a", "b", "c"}),
+                Collections.emptyList(),
+                Collections.singletonList("a"),
+                new HashMap<>());
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.singletonList("a"),
+                        new HashMap<>());
+
+        IllegalArgumentException e =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> action.build(env),
+                        "Expecting IllegalArgumentException");
+        assertThat(e).hasMessageContaining("Paimon schema and MySQL schema are 
not compatible.");
+    }
+
+    @Test
+    public void testInvalidPrimaryKey() {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "schema_evolution_\\d+");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.singletonList("pk"),
+                        new HashMap<>());
+
+        IllegalArgumentException e =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> action.build(env),
+                        "Expecting IllegalArgumentException");
+        assertThat(e).hasMessage("Specified primary key pk does not exist in 
MySQL tables");
+    }
+
+    @Test
+    public void testNoPrimaryKey() {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "incompatible_pk_\\d+");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>());
+
+        IllegalArgumentException e =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> action.build(env),
+                        "Expecting IllegalArgumentException");
+        assertThat(e)
+                .hasMessage(
+                        "Primary keys are not specified. "
+                                + "Also, can't infer primary keys from MySQL 
table schemas because "
+                                + "MySQL tables have no primary keys or have 
different primary keys.");
+    }
+
+    private void waitForResult(
+            List<String> expected, FileStoreTable table, RowType rowType, 
List<String> primaryKeys)
+            throws Exception {
+        assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
+
+        // wait for table schema to become our expected schema
+        while (true) {
+            int cnt = 0;
+            for (int i = 0; i < table.schema().fields().size(); i++) {
+                DataField field = table.schema().fields().get(i);
+                boolean sameName = 
field.name().equals(rowType.getFieldNames().get(i));
+                boolean sameType = 
field.type().equals(rowType.getFieldTypes().get(i));
+                if (sameName && sameType) {
+                    cnt++;
+                }
+            }
+            if (cnt == rowType.getFieldCount()) {
+                break;
+            }
+            table = table.copyWithLatestSchema();
+            Thread.sleep(1000);
+        }
+
+        // wait for data to become expected
+        List<String> sortedExpected = new ArrayList<>(expected);
+        Collections.sort(sortedExpected);
+        while (true) {
+            TableScan.Plan plan = table.newScan().plan();
+            List<String> result =
+                    getResult(
+                            table.newRead(),
+                            plan == null ? Collections.emptyList() : 
plan.splits(),
+                            rowType);
+            List<String> sortedActual = new ArrayList<>(result);
+            Collections.sort(sortedActual);
+            if (sortedExpected.equals(sortedActual)) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+    }
+
+    private Map<String, String> getBasicMySqlConfig() {
+        Map<String, String> config = new HashMap<>();
+        config.put("hostname", MYSQL_CONTAINER.getHost());
+        config.put("port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        config.put("username", USER);
+        config.put("password", PASSWORD);
+        config.put("server-time-zone", ZoneId.of("+00:00").toString());
+        return config;
+    }
+
+    private FileStoreTable getFileStoreTable() throws Exception {
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Identifier identifier = Identifier.create(database, tableName);
+        return (FileStoreTable) catalog.getTable(identifier);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
similarity index 54%
rename from paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
index 46c744699..087f48677 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
@@ -16,31 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.cdc;
-
-import org.apache.paimon.schema.SchemaChange;
-
-import java.io.Serializable;
-import java.util.List;
+package org.apache.paimon.flink.action.cdc.mysql;
 
 /**
- * Parse a CDC change event to a list of {@link SchemaChange} or {@link 
CdcRecord}.
+ * MySql version enum.
  *
- * @param <T> CDC change event type
+ * <p>Copied from <a
+ * 
href="https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlVersion.java";>ververica
+ * / flink-cdc-connectors</a>.
  */
-public interface EventParser<T> {
-
-    void setRawEvent(T rawEvent);
+public enum MySqlVersion {
+    V5_5("5.5"),
+    V5_6("5.6"),
+    V5_7("5.7"),
+    V8_0("8.0");
 
-    boolean isSchemaChange();
+    private String version;
 
-    List<SchemaChange> getSchemaChanges();
-
-    List<CdcRecord> getRecords();
+    MySqlVersion(String version) {
+        this.version = version;
+    }
 
-    /** Factory to create an {@link EventParser}. */
-    interface Factory<T> extends Serializable {
+    public String getVersion() {
+        return version;
+    }
 
-        EventParser<T> create();
+    @Override
+    public String toString() {
+        return "MySqlVersion{" + "version='" + version + '\'' + '}';
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index b5645404f..2abeb87e4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.util.AbstractTestBase;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
index 50cc9a95c..38ec56338 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.CommittableTypeInfo;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
@@ -150,8 +149,14 @@ public class SchemaAwareStoreWriteOperatorTest {
     public void testUpdateColumnType() throws Exception {
         RowType rowType =
                 RowType.of(
-                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.FLOAT()},
-                        new String[] {"k", "v1", "v2"});
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.FLOAT(),
+                            DataTypes.VARCHAR(5),
+                            DataTypes.VARBINARY(5)
+                        },
+                        new String[] {"k", "v1", "v2", "v3", "v4"});
 
         FileStoreTable table =
                 createFileStoreTable(
@@ -170,6 +175,8 @@ public class SchemaAwareStoreWriteOperatorTest {
         fields.put("k", "1");
         fields.put("v1", "10");
         fields.put("v2", "0.625");
+        fields.put("v3", "one");
+        fields.put("v4", "b_one");
         CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
         runner.offer(expected);
         CdcRecord actual = runner.take();
@@ -177,6 +184,8 @@ public class SchemaAwareStoreWriteOperatorTest {
 
         // check that records with new fields should be processed after schema 
is updated
 
+        // int -> bigint
+
         fields = new HashMap<>();
         fields.put("k", "2");
         fields.put("v1", "12345678987654321");
@@ -191,6 +200,8 @@ public class SchemaAwareStoreWriteOperatorTest {
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
+        // float -> double
+
         fields = new HashMap<>();
         fields.put("k", "3");
         fields.put("v1", "100");
@@ -204,6 +215,36 @@ public class SchemaAwareStoreWriteOperatorTest {
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
+        // varchar(5) -> varchar(10)
+
+        fields = new HashMap<>();
+        fields.put("k", "4");
+        fields.put("v1", "40");
+        fields.put("v3", "long four");
+        expected = new CdcRecord(RowKind.INSERT, fields);
+        runner.offer(expected);
+        actual = runner.poll(1);
+        assertThat(actual).isNull();
+
+        schemaManager.commitChanges(SchemaChange.updateColumnType("v3", 
DataTypes.VARCHAR(10)));
+        actual = runner.take();
+        assertThat(actual).isEqualTo(expected);
+
+        // varbinary(5) -> varbinary(10)
+
+        fields = new HashMap<>();
+        fields.put("k", "5");
+        fields.put("v1", "50");
+        fields.put("v4", "long five~");
+        expected = new CdcRecord(RowKind.INSERT, fields);
+        runner.offer(expected);
+        actual = runner.poll(1);
+        assertThat(actual).isNull();
+
+        schemaManager.commitChanges(SchemaChange.updateColumnType("v4", 
DataTypes.VARBINARY(10)));
+        actual = runner.take();
+        assertThat(actual).isEqualTo(expected);
+
         runner.stop();
         t.join();
         harness.close();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index f65e3002a..aedcc0362 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.schema.SchemaChange;
 
 import java.io.Serializable;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 75d11afa1..bb544d24c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
-import org.apache.paimon.cdc.EventParser;
 import org.apache.paimon.schema.SchemaChange;
 
 import java.util.Collections;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
index fc952af49..889c8216d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializableFunction;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index 410b1a7fe..8e63c8b7e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -28,8 +28,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,8 +38,6 @@ import java.util.UUID;
 @ExtendWith({TestLoggerExtension.class})
 public class AbstractTestBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTestBase.class);
-
     private static final int DEFAULT_PARALLELISM = 8;
 
     @RegisterExtension
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/my.cnf 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/my.cnf
new file mode 100644
index 000000000..9c9a8747b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/my.cnf
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 1
+binlog_format     = row
+
+# If this option is off
+# MySQL will set current timestamp to any timestamp field with NULL value
+# which is bad for our tests
+explicit_defaults_for_timestamp = ON
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
new file mode 100644
index 000000000..3cc6c9e51
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -0,0 +1,156 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'paimonuser' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
LOCK TABLES  ON *.* TO 'paimonuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+USE paimon_test;
+
+CREATE TABLE schema_evolution_1 (
+    pt INT,
+    _id INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_2 (
+    pt INT,
+    _id INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE all_types_table (
+    _id INT,
+    _boolean TINYINT(1),
+    _tinyint TINYINT,
+    _tinyint_unsigned TINYINT(2) UNSIGNED,
+    _tinyint_unsigned_zerofill TINYINT(2) UNSIGNED ZEROFILL,
+    _smallint SMALLINT,
+    _smallint_unsigned SMALLINT UNSIGNED,
+    _smallint_unsigned_zerofill SMALLINT(4) UNSIGNED ZEROFILL,
+    _mediumint MEDIUMINT,
+    _mediumint_unsigned MEDIUMINT UNSIGNED,
+    _mediumint_unsigned_zerofill MEDIUMINT(8) UNSIGNED ZEROFILL,
+    _int INT,
+    _int_unsigned INT UNSIGNED,
+    _int_unsigned_zerofill INT(8) UNSIGNED ZEROFILL,
+    _bigint BIGINT,
+    _bigint_unsigned BIGINT UNSIGNED,
+    _bigint_unsigned_zerofill BIGINT(16) UNSIGNED ZEROFILL,
+    _serial SERIAL,
+    _float FLOAT,
+    _float_unsigned FLOAT UNSIGNED,
+    _float_unsigned_zerofill FLOAT(4) UNSIGNED ZEROFILL,
+    _real REAL,
+    _real_unsigned REAL UNSIGNED,
+    _real_unsigned_zerofill REAL(10, 7) UNSIGNED ZEROFILL,
+    _double DOUBLE,
+    _double_unsigned DOUBLE UNSIGNED,
+    _double_unsigned_zerofill DOUBLE(10, 7) UNSIGNED ZEROFILL,
+    _double_precision DOUBLE PRECISION,
+    _double_precision_unsigned DOUBLE PRECISION UNSIGNED,
+    _double_precision_unsigned_zerofill DOUBLE PRECISION(10, 7) UNSIGNED 
ZEROFILL,
+    _numeric NUMERIC(8, 3),
+    _numeric_unsigned NUMERIC(8, 3) UNSIGNED,
+    _numeric_unsigned_zerofill NUMERIC(8, 3) UNSIGNED ZEROFILL,
+    _fixed FIXED(40, 3),
+    _fixed_unsigned FIXED(40, 3) UNSIGNED,
+    _fixed_unsigned_zerofill FIXED(40, 3) UNSIGNED ZEROFILL,
+    _decimal DECIMAL(8),
+    _decimal_unsigned DECIMAL(8) UNSIGNED,
+    _decimal_unsigned_zerofill DECIMAL(8) UNSIGNED ZEROFILL,
+    _date DATE,
+    _datetime DATETIME,
+    _timestamp TIMESTAMP(6) DEFAULT NULL,
+    _char CHAR(10),
+    _varchar VARCHAR(20),
+    _text TEXT,
+    _bin BINARY(10),
+    _varbin VARBINARY(20),
+    _blob BLOB,
+    PRIMARY KEY (_id)
+);
+
+INSERT INTO all_types_table VALUES (
+    1,
+    true, 1, 2, 3,
+    1000, 2000, 3000,
+    100000, 200000, 300000,
+    1000000, 2000000, 3000000,
+    10000000000, 20000000000, 30000000000, 40000000000,
+    1.5, 2.5, 3.5,
+    1.000001, 2.000002, 3.000003,
+    1.000011, 2.000022, 3.000033,
+    1.000111, 2.000222, 3.000333,
+    12345.11, 12345.22, 12345.33,
+    123456789876543212345678987654321.11, 
123456789876543212345678987654321.22, 123456789876543212345678987654321.33,
+    11111, 22222, 33333,
+    '2023-03-23', '2023-03-23 14:30:05', '2023-03-23 15:00:10.123456',
+    'Paimon', 'Apache Paimon', 'Apache Paimon MySQL Test Data',
+    'bytes', 'more bytes', 'very long bytes test data'
+), (
+    2,
+    NULL, NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL, 50000000000, -- SERIAL is never NULL
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL,
+    NULL, NULL, NULL
+);
+
+CREATE TABLE incompatible_field_1 (
+    _id INT,
+    v1 DATETIME,
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE incompatible_field_2 (
+    _id INT,
+    v1 INT,
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE incompatible_pk_1 (
+    a INT,
+    b BIGINT,
+    c VARCHAR(20),
+    PRIMARY KEY (a, b)
+);
+
+CREATE TABLE incompatible_pk_2 (
+    a INT,
+    b BIGINT,
+    c VARCHAR(20),
+    PRIMARY KEY (a)
+);

Reply via email to