This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e0c85305 [flink][mysql-cdc] Introduce TypeMapping to specify data 
type mapping rules (#1838)
8e0c85305 is described below

commit 8e0c853051ea884b5f19e9eab7922a37756ca6e3
Author: yuzelin <[email protected]>
AuthorDate: Mon Aug 21 18:20:32 2023 +0800

    [flink][mysql-cdc] Introduce TypeMapping to specify data type mapping rules 
(#1838)
---
 docs/content/how-to/cdc-ingestion.md               |  14 +-
 .../shortcodes/generated/mysql_sync_database.html  |  13 +
 .../shortcodes/generated/mysql_sync_table.html     |  13 +
 .../org/apache/paimon/utils/DateTimeUtils.java     |   2 +-
 .../org/apache/paimon/utils/DateTimeUtilsTest.java |  45 +++
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      |  25 +-
 .../tests/cdc/MySqlComputedColumnE2ETest.java      |   1 +
 .../tests/cdc/MySqlTinyIntConvertE2ETest.java      |   9 +-
 .../paimon/flink/action/cdc/DatabaseSyncMode.java  |   2 +-
 .../paimon/flink/action/cdc/TypeMapping.java       |  96 +++++
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |  43 +--
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |  25 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  17 +-
 .../cdc/mysql/MySqlSyncDatabaseActionFactory.java  |  18 +-
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  14 +-
 .../cdc/mysql/MySqlSyncTableActionFactory.java     |  11 +
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  |  32 +-
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |  34 +-
 .../flink/action/cdc/mysql/schema/MySqlSchema.java |  16 +-
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |   1 -
 .../cdc/mysql/MySqlCdcTypeMappingITCase.java       | 430 +++++++++++++++++++++
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 116 +-----
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  52 ---
 .../test/resources/mysql/sync_database_setup.sql   |  27 --
 .../src/test/resources/mysql/sync_table_setup.sql  |  10 -
 ...table_setup.sql => type_mapping_test_setup.sql} | 115 ++----
 26 files changed, 784 insertions(+), 397 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 84b0203a1..10d2211f0 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -64,6 +64,7 @@ To use this feature through `flink run`, run the following 
shell command.
     --table <table-name> \
     [--partition-keys <partition-keys>] \
     [--primary-keys <primary-keys>] \
+    [--type-mapping <option1,option2...>] \
     [--computed-column <'column-name=expr-name(args[, ...])'> 
[--computed-column ...]] \
     [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
@@ -148,6 +149,7 @@ To use this feature through `flink run`, run the following 
shell command.
     [--including-tables <mysql-table-name|name-regular-expr>] \
     [--excluding-tables <mysql-table-name|name-regular-expr>] \
     [--mode <sync-mode>] \
+    [--type-mapping <option1,option2...>] \
     [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
@@ -596,11 +598,13 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be 
ignored, `RENAME COLUMN` w
 
 {{< generated/compute_column >}}
 
-## Special Data Type Conversions
-1. MySQL TINYINT(1) type will be converted to Boolean by default. If you want 
to store number (-128~127) in it like MySQL, 
-you can specify that `--mysql-conf mysql.converter.tinyint1-to-bool=false`, 
then the column will be mapped to TINYINT in Paimon table.
-2. MySQL BIT(1) type will be converted to Boolean.
-3. When using Hive catalog, MySQL TIME type will be converted to STRING.
+## Special Data Type Mapping
+1. MySQL TINYINT(1) type will be mapped to Boolean by default. If you want to 
store number (-128~127) in it like MySQL, 
+you can specify type mapping option `tinyint1-not-bool`, then the column will 
be mapped to TINYINT in Paimon table.
+2. You can use type mapping option `to-nullable` to ignore all NOT NULL 
constraints (except primary keys).
+3. You can use type mapping option `to-string` to map all MySQL data type to 
STRING.
+4. MySQL BIT(1) type will be mapped to Boolean.
+5. When using Hive catalog, MySQL TIME type will be mapped to STRING.
 
 ## FAQ
 1. Chinese characters in records ingested from MySQL are garbled.
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html 
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index 0d07d824b..2ee73c010 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -61,6 +61,19 @@ under the License.
         <td><h5>--mode</h5></td>
         <td>It is used to specify synchronization mode.<br />Possible 
values:<ul><li>"divided" (the default mode if you haven't specified one): start 
a sink for each table, the synchronization of the new table requires restarting 
the job.</li><li>"combined": start a single combined sink for all tables, the 
new table will be automatically synchronized.</li></ul></td>
     </tr>
+    <tr>
+        <td><h5>--type-mapping</h5></td>
+        <td>It is used to specify how to map MySQL data type to Paimon 
type.<br />
+            Supported options:
+                <ul>
+                    <li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT 
instead of BOOLEAN.</li>
+                    <li>"to-nullable": ignores all NOT NULL constraints 
(except for primary keys).
+                        This is used to solve the problem that Flink cannot 
accept the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' 
operation.
+                    </li>
+                    <li>"to-string": maps all MySQL types to STRING.</li>
+                </ul>
+        </td>
+    </tr>
     <tr>
         <td><h5>--mysql-conf</h5></td>
         <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html 
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index 15cb8da72..255e81c68 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -45,6 +45,19 @@ under the License.
         <td><h5>--primary-keys</h5></td>
         <td>The primary keys for Paimon table. If there are multiple primary 
keys, connect them with comma, for example "buyer_id,seller_id".</td>
     </tr>
+    <tr>
+        <td><h5>--type-mapping</h5></td>
+        <td>It is used to specify how to map MySQL data type to Paimon 
type.<br />
+            Supported options:
+            <ul>
+                <li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT 
instead of BOOLEAN.</li>
+                <li>"to-nullable": ignores all NOT NULL constraints (except 
for primary keys).
+                    This is used to solve the problem that Flink cannot accept 
the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' operation.
+                </li>
+                <li>"to-string": maps all MySQL types to STRING.</li>
+            </ul>
+        </td>
+    </tr>
     <tr>
         <td><h5>--computed-column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
MySQL table field name. See <a href="#computed-functions">here</a> for a 
complete list of configurations. </td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index 093613f95..a088129af 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -154,7 +154,7 @@ public class DateTimeUtils {
         String nano = fraction.substring(0, precision);
 
         if (nano.length() > 0) {
-            ymdhms.append(".").append(fraction);
+            ymdhms.append(".").append(nano);
         }
 
         return ymdhms.toString();
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/DateTimeUtilsTest.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/DateTimeUtilsTest.java
new file mode 100644
index 000000000..e5c8facdc
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/DateTimeUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DateTimeUtils}. */
+public class DateTimeUtilsTest {
+
+    @Test
+    public void testFormatLocalDateTime() {
+        LocalDateTime time = LocalDateTime.of(2023, 8, 30, 12, 30, 59, 
999_999_999);
+        String[] expectations = new String[10];
+        expectations[0] = "2023-08-30 12:30:59";
+        expectations[1] = "2023-08-30 12:30:59.9";
+        for (int i = 2; i <= 9; i++) {
+            expectations[i] = expectations[i - 1] + "9";
+        }
+
+        for (int precision = 0; precision <= 9; precision++) {
+            assertThat(DateTimeUtils.formatLocalDateTime(time, precision))
+                    .isEqualTo(expectations[precision]);
+        }
+    }
+}
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index aa4c83c94..d4c7158f0 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -34,7 +34,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -114,6 +114,7 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
                 ACTION_SYNC_TABLE,
                 "pt",
                 "pt,_id",
+                null,
                 ImmutableMap.of(),
                 ImmutableMap.of(
                         "database-name", "paimon_sync_table", "table-name", 
"schema_evolution_.+"),
@@ -196,11 +197,11 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
 
     @Test
     public void testSyncDatabase() throws Exception {
-
         runAction(
                 ACTION_SYNC_DATABASE,
                 null,
                 null,
+                null,
                 ImmutableMap.of(),
                 ImmutableMap.of("database-name", "paimon_sync_database"),
                 ImmutableMap.of("bucket", "2"));
@@ -273,6 +274,7 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
                 ACTION_SYNC_TABLE,
                 "pt",
                 "pt,_id",
+                "tinyint1-not-bool",
                 ImmutableMap.of(),
                 ImmutableMap.of(
                         "database-name",
@@ -338,6 +340,7 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
                 ACTION_SYNC_DATABASE,
                 null,
                 null,
+                "tinyint1-not-bool",
                 ImmutableMap.of(),
                 ImmutableMap.of(
                         "database-name",
@@ -426,18 +429,23 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
     }
 
     protected void runAction(
-            @Nonnull String action,
-            String partitionKeys,
-            String primaryKeys,
-            @Nonnull Map<String, String> computedColumn,
-            @Nonnull Map<String, String> mysqlConf,
-            @Nonnull Map<String, String> tableConf)
+            String action,
+            @Nullable String partitionKeys,
+            @Nullable String primaryKeys,
+            @Nullable String typeMappingOptions,
+            Map<String, String> computedColumn,
+            Map<String, String> mysqlConf,
+            Map<String, String> tableConf)
             throws Exception {
 
         String partitionKeysStr =
                 StringUtils.isBlank(partitionKeys) ? "" : "--partition-keys " 
+ partitionKeys;
         String primaryKeysStr =
                 StringUtils.isBlank(primaryKeys) ? "" : "--primary-keys " + 
primaryKeys;
+        String typeMappingStr =
+                StringUtils.isBlank(typeMappingOptions)
+                        ? ""
+                        : "--type-mapping " + typeMappingOptions;
         String tableStr = action.equals(ACTION_SYNC_TABLE) ? "--table 
ts_table" : "";
 
         List<String> computedColumns =
@@ -475,6 +483,7 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
                         tableStr,
                         partitionKeysStr,
                         primaryKeysStr,
+                        typeMappingStr,
                         "--mysql-conf",
                         "hostname=mysql-1",
                         "--mysql-conf",
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
index 1f4dc3f27..054dfe47a 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
@@ -41,6 +41,7 @@ public class MySqlComputedColumnE2ETest extends 
MySqlCdcE2eTestBase {
                 ACTION_SYNC_TABLE,
                 "_year",
                 "pk,_year",
+                null,
                 ImmutableMap.of("_year", "'year(_datetime)'"),
                 ImmutableMap.of("database-name", "'test_computed_column'", 
"table-name", "'T'"),
                 ImmutableMap.of("bucket", "2"));
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
index 489de4c21..2c5f156b5 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -41,14 +41,9 @@ public class MySqlTinyIntConvertE2ETest extends 
MySqlCdcE2eTestBase {
                 ACTION_SYNC_TABLE,
                 null,
                 "pk",
+                "tinyint1-not-bool",
                 ImmutableMap.of(),
-                ImmutableMap.of(
-                        "database-name",
-                        "test_tinyint_convert",
-                        "table-name",
-                        "'T'",
-                        "mysql.converter.tinyint1-to-bool",
-                        "false"),
+                ImmutableMap.of("database-name", "test_tinyint_convert", 
"table-name", "'T'"),
                 ImmutableMap.of("bucket", "2"));
 
         try (Connection conn = getMySqlConnection();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
index 594ab06a4..e8b90947f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
@@ -46,7 +46,7 @@ public enum DatabaseSyncMode implements Serializable {
             case "combined":
                 return COMBINED;
             default:
-                throw new UnsupportedOperationException("Unsupported sink 
mode: " + mode);
+                throw new UnsupportedOperationException("Unsupported mode: " + 
mode);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
new file mode 100644
index 000000000..1066f001c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+
+/** Utility that holds data type mapping options. */
+public class TypeMapping implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<TypeMappingMode> typeMappingModes;
+
+    public TypeMapping(Set<TypeMappingMode> typeMappingModes) {
+        this.typeMappingModes = typeMappingModes;
+    }
+
+    public boolean containsMode(TypeMappingMode mode) {
+        return typeMappingModes.contains(mode);
+    }
+
+    public static TypeMapping defaultMapping() {
+        return new TypeMapping(Collections.emptySet());
+    }
+
+    public static TypeMapping parse(String[] rawOptions) {
+        List<String> options =
+                Arrays.stream(rawOptions)
+                        .map(String::trim)
+                        .map(String::toLowerCase)
+                        .collect(Collectors.toList());
+
+        Set<TypeMappingMode> typeMappingModes = new HashSet<>();
+
+        for (String option : options) {
+            switch (option.toLowerCase()) {
+                case "tinyint1-not-bool":
+                    typeMappingModes.add(TINYINT1_NOT_BOOL);
+                    break;
+                case "to-nullable":
+                    typeMappingModes.add(TO_NULLABLE);
+                    break;
+                case "to-string":
+                    typeMappingModes.add(TO_STRING);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported type mapping option: " + option);
+            }
+        }
+
+        return new TypeMapping(typeMappingModes);
+    }
+
+    /**
+     * Describe how to map MySQL data type to Paimon data type. Currently, 
three modes are
+     * supported:
+     *
+     * <ul>
+     *   <li>TINYINT1_NOT_BOOL: maps MySQL TINYINT(1) to TINYINT instead of 
BOOLEAN.
+     *   <li>TO_NULLABLE: ignores all NOT NULL constraints (except for primary 
keys).
+     *   <li>TO_STRING: maps all MySQL types to STRING.
+     * </ul>
+     */
+    public enum TypeMappingMode {
+        TINYINT1_NOT_BOOL,
+        TO_NULLABLE,
+        TO_STRING
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 978d2eaa8..73e83a253 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchema;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
@@ -60,6 +61,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Utils for MySQL Action. * */
@@ -73,30 +75,17 @@ public class MySqlActionUtils {
                     .withDescription(
                             "Whether capture the scan the newly added tables 
or not, by default is true.");
 
-    public static final ConfigOption<Boolean> MYSQL_CONVERTER_TINYINT1_BOOL =
-            ConfigOptions.key("mysql.converter.tinyint1-to-bool")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            "Mysql tinyint type will be converted to boolean 
type by default, if you want to convert to tinyint type, "
-                                    + "you can set this option to false.");
-
-    static Connection getConnection(Configuration mySqlConfig) throws 
Exception {
+    static Connection getConnection(Configuration mySqlConfig, boolean 
tinyint1NotBool)
+            throws Exception {
         String url =
                 String.format(
-                        "jdbc:mysql://%s:%d",
+                        "jdbc:mysql://%s:%d%s",
                         mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
-                        mySqlConfig.get(MySqlSourceOptions.PORT));
-
-        // we need to add the `tinyInt1isBit` parameter to the connection url 
to make sure the
-        // tinyint(1) in MySQL is converted to bits or not. Refer to
-        // 
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-result-sets.html#cj-conn-prop_tinyInt1isBit
-        if (mySqlConfig.contains(MYSQL_CONVERTER_TINYINT1_BOOL)) {
-            url =
-                    String.format(
-                            "%s?tinyInt1isBit=%s",
-                            url, 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
-        }
+                        mySqlConfig.get(MySqlSourceOptions.PORT),
+                        // we need to add the `tinyInt1isBit` parameter to the 
connection url to
+                        // make sure the tinyint(1) in MySQL is converted to 
bits or not. Refer to
+                        // 
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-result-sets.html#cj-conn-prop_tinyInt1isBit
+                        tinyint1NotBool ? "?tinyInt1isBit=false" : "");
 
         return DriverManager.getConnection(
                 url,
@@ -107,12 +96,15 @@ public class MySqlActionUtils {
     static MySqlSchemasInfo getMySqlTableInfos(
             Configuration mySqlConfig,
             Predicate<String> monitorTablePredication,
-            List<Identifier> excludedTables)
+            List<Identifier> excludedTables,
+            TypeMapping typeMapping)
             throws Exception {
         Pattern databasePattern =
                 
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
         MySqlSchemasInfo mySqlSchemasInfo = new MySqlSchemasInfo();
-        try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
+        try (Connection conn =
+                MySqlActionUtils.getConnection(
+                        mySqlConfig, 
typeMapping.containsMode(TINYINT1_NOT_BOOL))) {
             DatabaseMetaData metaData = conn.getMetaData();
             try (ResultSet schemas = metaData.getCatalogs()) {
                 while (schemas.next()) {
@@ -124,10 +116,7 @@ public class MySqlActionUtils {
                                 String tableName = 
tables.getString("TABLE_NAME");
                                 MySqlSchema mySqlSchema =
                                         MySqlSchema.buildSchema(
-                                                metaData,
-                                                databaseName,
-                                                tableName,
-                                                
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
+                                                metaData, databaseName, 
tableName, typeMapping);
                                 Identifier identifier = 
Identifier.create(databaseName, tableName);
                                 if (monitorTablePredication.test(tableName)) {
                                     mySqlSchemasInfo.addSchema(identifier, 
mySqlSchema);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 35fd888c4..7377050de 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -26,6 +26,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
@@ -72,6 +73,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link EventParser} for MySQL Debezium JSON. */
@@ -89,7 +91,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     @Nullable private final Pattern excludingPattern;
     private final Set<String> includedTables = new HashSet<>();
     private final Set<String> excludedTables = new HashSet<>();
-    private final boolean convertTinyint1ToBool;
+    private final TypeMapping typeMapping;
 
     private JsonNode root;
     private JsonNode payload;
@@ -101,7 +103,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             ZoneId serverTimeZone,
             boolean caseSensitive,
             List<ComputedColumn> computedColumns,
-            boolean convertTinyint1ToBool) {
+            TypeMapping typeMapping) {
         this(
                 serverTimeZone,
                 caseSensitive,
@@ -110,7 +112,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 ddl -> Optional.empty(),
                 null,
                 null,
-                convertTinyint1ToBool);
+                typeMapping);
     }
 
     public MySqlDebeziumJsonEventParser(
@@ -120,7 +122,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             NewTableSchemaBuilder<JsonNode> schemaBuilder,
             @Nullable Pattern includingPattern,
             @Nullable Pattern excludingPattern,
-            boolean convertTinyint1ToBool) {
+            TypeMapping typeMapping) {
         this(
                 serverTimeZone,
                 caseSensitive,
@@ -129,7 +131,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 schemaBuilder,
                 includingPattern,
                 excludingPattern,
-                convertTinyint1ToBool);
+                typeMapping);
     }
 
     public MySqlDebeziumJsonEventParser(
@@ -140,7 +142,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             NewTableSchemaBuilder<JsonNode> schemaBuilder,
             @Nullable Pattern includingPattern,
             @Nullable Pattern excludingPattern,
-            boolean convertTinyint1ToBool) {
+            TypeMapping typeMapping) {
         this.serverTimeZone = serverTimeZone;
         this.caseSensitive = caseSensitive;
         this.computedColumns = computedColumns;
@@ -148,7 +150,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         this.schemaBuilder = schemaBuilder;
         this.includingPattern = includingPattern;
         this.excludingPattern = excludingPattern;
-        this.convertTinyint1ToBool = convertTinyint1ToBool;
+        this.typeMapping = typeMapping;
     }
 
     @Override
@@ -212,11 +214,10 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                             column.get("typeName").asText(),
                             length == null ? null : length.asInt(),
                             scale == null ? null : scale.asInt(),
-                            convertTinyint1ToBool);
-            if (column.get("optional").asBoolean()) {
-                type = type.nullable();
-            } else {
-                type = type.notNull();
+                            typeMapping);
+
+            if (!typeMapping.containsMode(TO_NULLABLE)) {
+                type = type.copy(column.get("optional").asBoolean());
             }
 
             String fieldName = column.get("name").asText();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 2858c78ad..3386920c7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -57,7 +58,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
 import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
-import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -111,6 +111,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
     private String includingTables = ".*";
     @Nullable String excludingTables;
     private DatabaseSyncMode mode = DIVIDED;
+    private TypeMapping typeMapping = TypeMapping.defaultMapping();
 
     // for test purpose
     private final List<Identifier> monitoredTables = new ArrayList<>();
@@ -177,6 +178,11 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         return this;
     }
 
+    public MySqlSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) {
+        this.typeMapping = typeMapping;
+        return this;
+    }
+
     public void build(StreamExecutionEnvironment env) throws Exception {
         checkArgument(
                 !mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
@@ -198,7 +204,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         mySqlConfig,
                         tableName ->
                                 shouldMonitorTable(tableName, 
includingPattern, excludingPattern),
-                        excludedTables);
+                        excludedTables,
+                        typeMapping);
 
         logNonPkTables(mySqlSchemasInfo.nonPkTables());
         List<MySqlTableInfo> mySqlTableInfos = 
mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
@@ -255,9 +262,9 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
-        Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
+        TypeMapping typeMapping = this.typeMapping;
         MySqlTableSchemaBuilder schemaBuilder =
-                new MySqlTableSchemaBuilder(tableConfig, caseSensitive, 
convertTinyint1ToBool);
+                new MySqlTableSchemaBuilder(tableConfig, caseSensitive, 
typeMapping);
 
         EventParser.Factory<String> parserFactory =
                 () ->
@@ -268,7 +275,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                                 schemaBuilder,
                                 includingPattern,
                                 excludingPattern,
-                                convertTinyint1ToBool);
+                                typeMapping);
 
         String database = this.database;
         DatabaseSyncMode mode = this.mode;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index c320a3203..a7a092bbb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
@@ -40,15 +41,14 @@ public class MySqlSyncDatabaseActionFactory implements 
ActionFactory {
     public Optional<Action> create(MultipleParameterTool params) {
         checkRequiredArgument(params, "mysql-conf");
 
-        MySqlSyncDatabaseAction mySqlSyncDatabaseAction =
+        MySqlSyncDatabaseAction action =
                 new MySqlSyncDatabaseAction(
                         getRequiredValue(params, "warehouse"),
                         getRequiredValue(params, "database"),
                         optionalConfigMap(params, "catalog-conf"),
                         optionalConfigMap(params, "mysql-conf"));
 
-        mySqlSyncDatabaseAction
-                .withTableConfig(optionalConfigMap(params, "table-conf"))
+        action.withTableConfig(optionalConfigMap(params, "table-conf"))
                 
.ignoreIncompatible(Boolean.parseBoolean(params.get("ignore-incompatible")))
                 .mergeShards(
                         !params.has("merge-shards")
@@ -59,7 +59,12 @@ public class MySqlSyncDatabaseActionFactory implements 
ActionFactory {
                 .excludingTables(params.get("excluding-tables"))
                 .withMode(DatabaseSyncMode.fromString(params.get("mode")));
 
-        return Optional.of(mySqlSyncDatabaseAction);
+        if (params.has("type-mapping")) {
+            String[] options = params.get("type-mapping").split(",");
+            action.withTypeMapping(TypeMapping.parse(options));
+        }
+
+        return Optional.of(action);
     }
 
     @Override
@@ -82,6 +87,7 @@ public class MySqlSyncDatabaseActionFactory implements 
ActionFactory {
                         + "[--including-tables 
<mysql-table-name|name-regular-expr>] "
                         + "[--excluding-tables 
<mysql-table-name|name-regular-expr>] "
                         + "[--mode <sync-mode>] "
+                        + "[--type-mapping <option1,option2...>] "
                         + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
                         + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -125,6 +131,10 @@ public class MySqlSyncDatabaseActionFactory implements 
ActionFactory {
                 "  2. 'combined': start a single combined sink for all tables, 
the new table will be automatically synchronized.");
         System.out.println();
 
+        System.out.println(
+                "--type-mapping is used to specify how to map MySQL type to 
Paimon type. Please see the doc for usage.");
+        System.out.println();
+
         System.out.println("MySQL CDC source conf syntax:");
         System.out.println("  key=value");
         System.out.println(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 4d3b0957c..f01e6fa70 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
@@ -50,7 +51,6 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
-import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -95,6 +95,7 @@ public class MySqlSyncTableAction extends ActionBase {
 
     private Map<String, String> tableConfig = new HashMap<>();
     private List<String> computedColumnArgs = new ArrayList<>();
+    private TypeMapping typeMapping = TypeMapping.defaultMapping();
 
     public MySqlSyncTableAction(
             String warehouse, String database, String table, Map<String, 
String> mySqlConfig) {
@@ -141,6 +142,11 @@ public class MySqlSyncTableAction extends ActionBase {
         return this;
     }
 
+    public MySqlSyncTableAction withTypeMapping(TypeMapping typeMapping) {
+        this.typeMapping = typeMapping;
+        return this;
+    }
+
     public void build(StreamExecutionEnvironment env) throws Exception {
         checkArgument(
                 mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
@@ -155,7 +161,7 @@ public class MySqlSyncTableAction extends ActionBase {
 
         MySqlSchemasInfo mySqlSchemasInfo =
                 MySqlActionUtils.getMySqlTableInfos(
-                        mySqlConfig, monitorTablePredication(), new 
ArrayList<>());
+                        mySqlConfig, monitorTablePredication(), new 
ArrayList<>(), typeMapping);
         validateMySqlTableInfos(mySqlSchemasInfo);
 
         catalog.createDatabase(database, true);
@@ -201,11 +207,11 @@ public class MySqlSyncTableAction extends ActionBase {
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
-        Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
+        TypeMapping typeMapping = this.typeMapping;
         EventParser.Factory<String> parserFactory =
                 () ->
                         new MySqlDebeziumJsonEventParser(
-                                zoneId, caseSensitive, computedColumns, 
convertTinyint1ToBool);
+                                zoneId, caseSensitive, computedColumns, 
typeMapping);
 
         CdcSinkBuilder<String> sinkBuilder =
                 new CdcSinkBuilder<String>()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
index 98a824e87..dd5828b69 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
@@ -64,6 +65,11 @@ public class MySqlSyncTableActionFactory implements 
ActionFactory {
                     new 
ArrayList<>(params.getMultiParameter("computed-column")));
         }
 
+        if (params.has("type-mapping")) {
+            String[] options = params.get("type-mapping").split(",");
+            action.withTypeMapping(TypeMapping.parse(options));
+        }
+
         return Optional.of(action);
     }
 
@@ -80,6 +86,7 @@ public class MySqlSyncTableActionFactory implements 
ActionFactory {
                         + "--table <table-name> "
                         + "[--partition-keys <partition-keys>] "
                         + "[--primary-keys <primary-keys>] "
+                        + "[--type-mapping <option1,option2...>] "
                         + "[--computed-column <'column-name=expr-name(args[, 
...])'> [--computed-column ...]] "
                         + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
@@ -98,6 +105,10 @@ public class MySqlSyncTableActionFactory implements 
ActionFactory {
         System.out.println("Primary keys will be derived from MySQL tables if 
not specified.");
         System.out.println();
 
+        System.out.println(
+                "--type-mapping is used to specify how to map MySQL type to 
Paimon type. Please see the doc for usage.");
+        System.out.println();
+
         System.out.println("Please see doc for usage of --computed-column.");
         System.out.println();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 57fd97223..3779d1101 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
@@ -32,6 +33,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
@@ -39,13 +41,13 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode>
 
     private final Map<String, String> tableConfig;
     private final boolean caseSensitive;
-    private final boolean convertTinyint1ToBool;
+    private final TypeMapping typeMapping;
 
     public MySqlTableSchemaBuilder(
-            Map<String, String> tableConfig, boolean caseSensitive, boolean 
convertTinyint1ToBool) {
+            Map<String, String> tableConfig, boolean caseSensitive, 
TypeMapping typeMapping) {
         this.tableConfig = tableConfig;
         this.caseSensitive = caseSensitive;
-        this.convertTinyint1ToBool = convertTinyint1ToBool;
+        this.typeMapping = typeMapping;
     }
 
     @Override
@@ -56,17 +58,21 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode>
         LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();
 
         for (JsonNode element : columns) {
-            Integer precision = element.has("length") ? 
element.get("length").asInt() : null;
-            Integer scale = element.has("scale") ? 
element.get("scale").asInt() : null;
-            fields.put(
-                    element.get("name").asText(),
-                    // TODO : add table comment and column comment when we 
upgrade flink cdc to 2.4
+            JsonNode length = element.get("length");
+            JsonNode scale = element.get("scale");
+            DataType dataType =
                     MySqlTypeUtils.toDataType(
-                                    element.get("typeExpression").asText(),
-                                    precision,
-                                    scale,
-                                    convertTinyint1ToBool)
-                            .copy(element.get("optional").asBoolean()));
+                            element.get("typeExpression").asText(),
+                            length == null ? null : length.asInt(),
+                            scale == null ? null : scale.asInt(),
+                            typeMapping);
+
+            if (!typeMapping.containsMode(TO_NULLABLE)) {
+                dataType.copy(element.get("optional").asBoolean());
+            }
+
+            // TODO : add table comment and column comment when we upgrade 
flink cdc to 2.4
+            fields.put(element.get("name").asText(), dataType);
         }
 
         ArrayNode arrayNode = (ArrayNode) 
jsonTable.get("primaryKeyColumnNames");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index f7b696a89..80ba982bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -23,6 +23,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.types.BinaryType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -45,7 +46,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 
 /** Converts from MySQL type to {@link DataType}. */
 public class MySqlTypeUtils {
@@ -142,14 +144,26 @@ public class MySqlTypeUtils {
                 MySqlTypeUtils.getShortType(mysqlType),
                 MySqlTypeUtils.getPrecision(mysqlType),
                 MySqlTypeUtils.getScale(mysqlType),
-                MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue());
+                false);
     }
 
     public static DataType toDataType(
             String type,
             @Nullable Integer length,
             @Nullable Integer scale,
-            Boolean tinyInt1ToBool) {
+            TypeMapping typeMapping) {
+        if (typeMapping.containsMode(TO_STRING)) {
+            return DataTypes.STRING();
+        } else {
+            return toDataType(type, length, scale, 
typeMapping.containsMode(TINYINT1_NOT_BOOL));
+        }
+    }
+
+    private static DataType toDataType(
+            String type,
+            @Nullable Integer length,
+            @Nullable Integer scale,
+            boolean tinyInt1NotBool) {
         switch (type.toUpperCase()) {
             case BIT:
                 if (length == null || length == 1) {
@@ -161,15 +175,13 @@ public class MySqlTypeUtils {
             case BOOL:
                 return DataTypes.BOOLEAN();
             case TINYINT:
-                // MySQL haven't boolean type, it uses tinyint(1) to 
represents boolean type
-                // user should not use tinyint(1) to store number although 
jdbc url parameter
+                // MySQL haven't boolean type, it uses tinyint(1) to 
represents boolean type.
+                // User should not use tinyint(1) to store number although 
jdbc url parameter
                 // tinyInt1isBit=false can help change the return value, it's 
not a general way.
-                // mybatis and mysql-connector-java map tinyint(1) to boolean 
by default, we behave
-                // the same way by default. To store number (-128~127), we can 
set the parameter
-                // tinyInt1ToByte (option 'mysql.converter.tinyint1-to-bool') 
to false, then
-                // tinyint(1)
-                // will be mapped to TinyInt.
-                return length != null && length == 1 && tinyInt1ToBool
+                // Mybatis and mysql-connector-java map tinyint(1) to boolean 
by default, we behave
+                // the same way by default. To store number (-128~127), user 
can set the type
+                // mapping option 'tinyint1-not-bool' then tinyint(1) will be 
mapped to tinyint.
+                return length != null && length == 1 && !tinyInt1NotBool
                         ? DataTypes.BOOLEAN()
                         : DataTypes.TINYINT();
             case TINYINT_UNSIGNED:
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 3e4964aff..5ca2d877b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql.schema;
 
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.types.DataType;
@@ -49,29 +50,28 @@ public class MySqlSchema {
             DatabaseMetaData metaData,
             String databaseName,
             String tableName,
-            boolean convertTinyintToBool)
+            TypeMapping typeMapping)
             throws SQLException {
         LinkedHashMap<String, Pair<DataType, String>> fields = new 
LinkedHashMap<>();
         try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, 
null)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 String fieldType = rs.getString("TYPE_NAME");
-                Integer precision = rs.getInt("COLUMN_SIZE");
                 String fieldComment = rs.getString("REMARKS");
 
+                Integer precision = rs.getInt("COLUMN_SIZE");
                 if (rs.wasNull()) {
                     precision = null;
                 }
+
                 Integer scale = rs.getInt("DECIMAL_DIGITS");
                 if (rs.wasNull()) {
                     scale = null;
                 }
-                fields.put(
-                        fieldName,
-                        Pair.of(
-                                MySqlTypeUtils.toDataType(
-                                        fieldType, precision, scale, 
convertTinyintToBool),
-                                fieldComment));
+                DataType paimonType =
+                        MySqlTypeUtils.toDataType(fieldType, precision, scale, 
typeMapping);
+
+                fields.put(fieldName, Pair.of(paimonType, fieldComment));
             }
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 7684ddbbc..f34b42f2b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -166,7 +166,6 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
 
     protected List<SchemaChange> extractSchemaChanges(
             SchemaManager schemaManager, List<DataField> updatedDataFields) {
-
         RowType oldRowType = schemaManager.latest().get().logicalRowType();
         Map<String, DataField> oldFields = new HashMap<>();
         for (DataField oldField : oldRowType.getFields()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
new file mode 100644
index 000000000..26e1638d9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT test for {@link TypeMapping} in MySQL CDC. */
+public class MySqlCdcTypeMappingITCase extends MySqlActionITCaseBase {
+
+    @BeforeAll
+    public static void startContainers() {
+        MYSQL_CONTAINER.withSetupSQL("mysql/type_mapping_test_setup.sql");
+        start();
+    }
+
+    // ------------------------------------- tinyint1-not-bool 
-------------------------------------
+
+    @Test
+    @Timeout(60)
+    public void testTinyInt1NotBool() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "tinyint1_not_bool_test");
+
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withMode(COMBINED)
+                        .withTypeMapping(new 
TypeMapping(Collections.singleton(TINYINT1_NOT_BOOL)));
+        runActionWithDefaultEnv(action);
+
+        // read old data
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.TINYINT()},
+                        new String[] {"pk", "_tinyint1"});
+        waitForResult(
+                Collections.singletonList("+I[1, 1]"),
+                getFileStoreTable("t1"),
+                rowType1,
+                Collections.singletonList("pk"));
+
+        try (Statement statement = getStatement()) {
+            statement.executeUpdate("USE tinyint1_not_bool_test");
+
+            // test schema evolution
+            statement.executeUpdate("ALTER TABLE t1 ADD COLUMN _new_tinyint1 
TINYINT(1)");
+            statement.executeUpdate("INSERT INTO t1 VALUES (2, -128, 127)");
+
+            RowType rowType2 =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), 
DataTypes.TINYINT(), DataTypes.TINYINT()
+                            },
+                            new String[] {"pk", "_tinyint1", "_new_tinyint1"});
+            waitForResult(
+                    Arrays.asList("+I[1, 1, NULL]", "+I[2, -128, 127]"),
+                    getFileStoreTable("t1"),
+                    rowType2,
+                    Collections.singletonList("pk"));
+
+            // test newly created table
+            statement.executeUpdate(
+                    "CREATE TABLE t2 (pk INT, _tinyint1 TINYINT(1), PRIMARY 
KEY (pk))");
+            statement.executeUpdate("INSERT INTO t2 VALUES (1, 1), (2, 127), 
(3, -128)");
+
+            waitingTables("t2");
+            waitForResult(
+                    Arrays.asList("+I[1, 1]", "+I[2, 127]", "+I[3, -128]"),
+                    getFileStoreTable("t2"),
+                    rowType1,
+                    Collections.singletonList("pk"));
+        }
+    }
+
+    // --------------------------------------- all-to-string 
---------------------------------------
+
+    // TODO: test BIT(n) after fix bit type
+    @Test
+    @Timeout(60)
+    public void testReadAllTypes() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "all_to_string_test");
+        mySqlConfig.put("table-name", "all_types_table");
+
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig)
+                        .withPartitionKeys("pt")
+                        .withPrimaryKeys("pt", "_id")
+                        .withTypeMapping(new 
TypeMapping(Collections.singleton(TO_STRING)));
+        runActionWithDefaultEnv(action);
+
+        int allTypeNums = 76;
+        DataType[] types =
+                IntStream.range(0, allTypeNums)
+                        .mapToObj(i -> DataTypes.STRING())
+                        .toArray(DataType[]::new);
+        types[0] = types[0].notNull();
+        types[1] = types[1].notNull();
+
+        RowType rowType =
+                RowType.of(
+                        types,
+                        new String[] {
+                            "_id",
+                            "pt",
+                            "_bit1",
+                            "_tinyint1",
+                            "_boolean",
+                            "_bool",
+                            "_tinyint",
+                            "_tinyint_unsigned",
+                            "_tinyint_unsigned_zerofill",
+                            "_smallint",
+                            "_smallint_unsigned",
+                            "_smallint_unsigned_zerofill",
+                            "_mediumint",
+                            "_mediumint_unsigned",
+                            "_mediumint_unsigned_zerofill",
+                            "_int",
+                            "_int_unsigned",
+                            "_int_unsigned_zerofill",
+                            "_bigint",
+                            "_bigint_unsigned",
+                            "_bigint_unsigned_zerofill",
+                            "_serial",
+                            "_float",
+                            "_float_unsigned",
+                            "_float_unsigned_zerofill",
+                            "_real",
+                            "_real_unsigned",
+                            "_real_unsigned_zerofill",
+                            "_double",
+                            "_double_unsigned",
+                            "_double_unsigned_zerofill",
+                            "_double_precision",
+                            "_double_precision_unsigned",
+                            "_double_precision_unsigned_zerofill",
+                            "_numeric",
+                            "_numeric_unsigned",
+                            "_numeric_unsigned_zerofill",
+                            "_fixed",
+                            "_fixed_unsigned",
+                            "_fixed_unsigned_zerofill",
+                            "_decimal",
+                            "_decimal_unsigned",
+                            "_decimal_unsigned_zerofill",
+                            "_date",
+                            "_datetime",
+                            "_datetime3",
+                            "_datetime6",
+                            "_datetime_p",
+                            "_datetime_p2",
+                            "_timestamp",
+                            "_timestamp0",
+                            "_char",
+                            "_varchar",
+                            "_tinytext",
+                            "_text",
+                            "_mediumtext",
+                            "_longtext",
+                            "_bin",
+                            "_varbin",
+                            "_tinyblob",
+                            "_blob",
+                            "_mediumblob",
+                            "_longblob",
+                            "_json",
+                            "_enum",
+                            "_year",
+                            "_time",
+                            "_point",
+                            "_geometry",
+                            "_linestring",
+                            "_polygon",
+                            "_multipoint",
+                            "_multiline",
+                            "_multipolygon",
+                            "_geometrycollection",
+                            "_set",
+                        });
+
+        List<String> expected =
+                Arrays.asList(
+                        "+I["
+                                + "1, 1.1, "
+                                + "true, "
+                                + "1, 1, 0, 1, 2, 3, "
+                                + "1000, 2000, 3000, "
+                                + "100000, 200000, 300000, "
+                                + "1000000, 2000000, 3000000, "
+                                + "10000000000, 20000000000, 30000000000, 
40000000000, "
+                                + "1.5, 2.5, 3.5, "
+                                + "1.000001, 2.000002, 3.000003, "
+                                + "1.000011, 2.000022, 3.000033, "
+                                + "1.000111, 2.000222, 3.000333, "
+                                + "12345.11, 12345.22, 12345.33, "
+                                + "1.2345678987654322E32, 
1.2345678987654322E32, 1.2345678987654322E32, "
+                                + "11111, 22222, 33333, "
+                                + "2023-03-23, "
+                                // display value of datetime is not affected 
by timezone
+                                + "2023-03-23 14:30:05.000, 2023-03-23 
14:30:05.123, 2023-03-23 14:30:05.123456, "
+                                + "2023-03-24 14:30:00.000, 2023-03-24 
14:30:05.120, "
+                                // display value of timestamp is affected by 
timezone
+                                // we store 2023-03-23T15:00:10.123456 in 
UTC-8 system timezone
+                                // and query this timestamp in UTC-5 MySQL 
server timezone
+                                // so the display value should increase by 3 
hour
+                                + "2023-03-23 18:00:10.123456, 2023-03-23 
03:10:00.000000, "
+                                + "Paimon, Apache Paimon, Apache Paimon MySQL 
TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL 
MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, "
+                                + "bytes\u0000\u0000\u0000\u0000\u0000, more 
bytes, TINYBLOB type test data, BLOB type test data, MEDIUMBLOB type test data, 
LONGBLOB  bytes test data, "
+                                + "{\"a\": \"b\"}, "
+                                + "value1, "
+                                + "2023, "
+                                + "10:13:23, "
+                                + 
"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, "
+                                + 
"{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0},
 "
+                                + 
"{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, "
+                                + 
"{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0},
 "
+                                + 
"{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, "
+                                + 
"{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0},
 "
+                                + 
"{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0},
 "
+                                + 
"{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0},
 "
+                                + "a,b"
+                                + "]",
+                        "+I["
+                                + "2, 2.2, "
+                                + "NULL, "
+                                + "NULL, NULL, NULL, NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, 50000000000, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, "
+                                + "NULL, NULL, NULL, "
+                                + "NULL, NULL, "
+                                + "NULL, NULL, "
+                                + "NULL, NULL, NULL, NULL, NULL, NULL, "
+                                + "NULL, NULL, NULL, NULL, NULL, NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL, "
+                                + "NULL"
+                                + "]");
+
+        waitForResult(expected, getFileStoreTable(tableName), rowType, 
Arrays.asList("pt", "_id"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionAndNewlyCreatedTable() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "all_to_string_test");
+
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .excludingTables("all_types_table")
+                        .withMode(COMBINED)
+                        .withTypeMapping(new 
TypeMapping(Collections.singleton(TO_STRING)));
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement()) {
+            statement.executeUpdate("USE all_to_string_test");
+
+            // test schema evolution
+            statement.executeUpdate("INSERT INTO schema_evolution_test VALUES 
(1, 1)");
+            FileStoreTable table = getFileStoreTable("schema_evolution_test");
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.STRING().notNull(), 
DataTypes.STRING()},
+                            new String[] {"pk", "v1"});
+            waitForResult(
+                    Collections.singletonList("+I[1, 1]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("pk"));
+
+            statement.executeUpdate("ALTER TABLE schema_evolution_test MODIFY 
COLUMN v1 BIGINT");
+            statement.executeUpdate("INSERT INTO schema_evolution_test VALUES 
(2, 20000000000)");
+
+            waitForResult(
+                    Arrays.asList("+I[1, 1]", "+I[2, 20000000000]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("pk"));
+
+            statement.executeUpdate(
+                    "ALTER TABLE schema_evolution_test ADD COLUMN v2 
VARBINARY(10)");
+            statement.executeUpdate(
+                    "INSERT INTO schema_evolution_test VALUES (3, 3, 
'0123456789'), (4, 4, 'A')");
+
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.STRING().notNull(), 
DataTypes.STRING(), DataTypes.STRING()
+                            },
+                            new String[] {"pk", "v1", "v2"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, 1, NULL]",
+                            "+I[2, 20000000000, NULL]",
+                            "+I[3, 3, 0123456789]",
+                            "+I[4, 4, A]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("pk"));
+
+            // test newly created table
+            statement.executeUpdate(
+                    "CREATE TABLE _new_table (pk INT, v VARBINARY(10), PRIMARY 
KEY (pk))");
+            statement.executeUpdate("INSERT INTO _new_table VALUES (1, 
'Paimon')");
+
+            waitingTables("_new_table");
+            waitForResult(
+                    Collections.singletonList("+I[1, Paimon]"),
+                    getFileStoreTable("_new_table"),
+                    RowType.of(
+                            new DataType[] {DataTypes.STRING().notNull(), 
DataTypes.STRING()},
+                            new String[] {"pk", "v"}),
+                    Collections.singletonList("pk"));
+        }
+    }
+
+    // ------------------------------------- ignore-not-null 
-------------------------------------
+    @Test
+    @Timeout(60)
+    public void testIgnoreNotNull() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "ignore_not_null_test");
+
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withMode(COMBINED)
+                        .withTypeMapping(new 
TypeMapping(Collections.singleton(TO_NULLABLE)));
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable("t1");
+        assertThat(table.rowType().getTypeAt(1).isNullable()).isTrue();
+
+        try (Statement statement = getStatement()) {
+            statement.executeUpdate("USE ignore_not_null_test");
+
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                            new String[] {"pk", "v1"});
+            waitForResult(
+                    Collections.singletonList("+I[1, A]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("pk"));
+
+            // test schema evolution
+            statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 INT NOT NULL 
DEFAULT 100");
+            statement.executeUpdate("INSERT INTO t1 VALUES (2, 'B', 10)");
+
+            while (table.rowType().getFieldCount() != 3) {
+                table = table.copyWithLatestSchema();
+                Thread.sleep(1_000);
+            }
+            assertThat(table.rowType().getTypeAt(2).isNullable()).isTrue();
+
+            // Currently we haven't handle default value
+            waitForResult(
+                    Arrays.asList("+I[1, A, NULL]", "+I[2, B, 10]"),
+                    table,
+                    table.rowType(),
+                    Collections.singletonList("pk"));
+
+            // test newly created table
+            statement.executeUpdate(
+                    "CREATE TABLE _new_table (pk INT, v INT NOT NULL, PRIMARY 
KEY (pk))");
+
+            waitingTables("_new_table");
+            
assertThat(getFileStoreTable("_new_table").rowType().getTypeAt(1).isNullable())
+                    .isTrue();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index d3e5ea6d9..47ded57f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -202,84 +202,6 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table2, rowType2, primaryKeys2);
     }
 
-    @Test
-    @Timeout(60)
-    public void testSchemaEvolutionWithTinyInt1Convert() throws Exception {
-        Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", 
"paimon_sync_database_tinyint_schema");
-        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
-        MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
-                        .withTableConfig(getBasicTableConfig());
-        runActionWithDefaultEnv(action);
-
-        try (Statement statement = getStatement()) {
-            testSchemaEvolutionImplWithTinyInt1Convert(statement);
-        }
-    }
-
-    private void testSchemaEvolutionImplWithTinyInt1Convert(Statement 
statement) throws Exception {
-        FileStoreTable table1 = getFileStoreTable("schema_evolution_4");
-        FileStoreTable table2 = getFileStoreTable("schema_evolution_5");
-
-        statement.executeUpdate("USE " + 
"paimon_sync_database_tinyint_schema");
-
-        statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (1, 
'one')");
-        statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (2, 
'two', 21)");
-        statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (3, 
'three')");
-        statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (4, 
'four', 24)");
-
-        RowType rowType1 =
-                RowType.of(
-                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
-                        new String[] {"_id", "v1"});
-
-        List<String> primaryKeys1 = Collections.singletonList("_id");
-        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
-
-        waitForResult(expected, table1, rowType1, primaryKeys1);
-
-        RowType rowType2 =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.TINYINT()
-                        },
-                        new String[] {"_id", "v1", "v2"});
-        List<String> primaryKeys2 = Collections.singletonList("_id");
-        expected = Arrays.asList("+I[2, two, 21]", "+I[4, four, 24]");
-        waitForResult(expected, table2, rowType2, primaryKeys2);
-
-        statement.executeUpdate("ALTER TABLE schema_evolution_4 ADD COLUMN v2 
TINYINT(1)");
-        statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (5, 
'five', 42)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_5 ADD COLUMN v3 
TINYINT(1)");
-        statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (6, 
'six', 42, 43)");
-
-        rowType1 =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.TINYINT()
-                        },
-                        new String[] {"_id", "v1", "v2"});
-
-        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", 
"+I[5, five, 42]");
-        waitForResult(expected, table1, rowType1, primaryKeys1);
-
-        rowType2 =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(10),
-                            DataTypes.TINYINT(),
-                            DataTypes.TINYINT()
-                        },
-                        new String[] {"_id", "v1", "v2", "v3"});
-        expected =
-                Arrays.asList(
-                        "+I[2, two, 21, NULL]", "+I[4, four, 24, NULL]", 
"+I[6, six, 42, 43]");
-        waitForResult(expected, table2, rowType2, primaryKeys2);
-    }
-
     @Test
     public void testSpecifiedMySqlTable() {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
@@ -642,7 +564,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     }
 
     @Test
-    @Timeout(60)
+    @Timeout(120)
     public void testAddIgnoredTable() throws Exception {
         String mySqlDatabase = "paimon_sync_database_add_ignored_table";
 
@@ -952,42 +874,6 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         return env.executeAsync();
     }
 
-    @Test
-    @Timeout(60)
-    public void testTinyInt1Convert() throws Exception {
-        Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", "paimon_sync_database_tinyint");
-        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
-        MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
-                        .withTableConfig(getBasicTableConfig());
-        runActionWithDefaultEnv(action);
-
-        try (Statement statement = getStatement()) {
-            testTinyInt1Convert(statement);
-        }
-    }
-
-    private void testTinyInt1Convert(Statement statement) throws Exception {
-        FileStoreTable table = getFileStoreTable("t4");
-
-        statement.executeUpdate("USE paimon_sync_database_tinyint");
-
-        statement.executeUpdate("INSERT INTO t4 VALUES (1, '2021-09-15 
15:00:10', 21)");
-        statement.executeUpdate("INSERT INTO t4 VALUES (2, '2023-03-23 
16:00:20', 42)");
-
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(), DataTypes.TIMESTAMP(0), 
DataTypes.TINYINT()
-                        },
-                        new String[] {"pk", "_datetime", "_tinyint1"});
-        List<String> expected =
-                Arrays.asList("+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 
2023-03-23T16:00:20, 42]");
-        waitForResult(expected, table, rowType, 
Collections.singletonList("pk"));
-    }
-
     @Test
     @Timeout(240)
     public void testSyncManyTableWithLimitedMemory() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 6100288d8..7f747b1b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -794,58 +794,6 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table, rowType, Arrays.asList("pk", 
"_year_date"));
     }
 
-    @Test
-    @Timeout(60)
-    public void testTinyInt1Convert() throws Exception {
-        Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", DATABASE_NAME);
-        mySqlConfig.put("table-name", "test_tinyint1_convert");
-        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
-        MySqlSyncTableAction action =
-                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig);
-        runActionWithDefaultEnv(action);
-
-        checkTableSchema(
-                "[{\"id\":0,\"name\":\"pk\",\"type\":\"INT NOT 
NULL\",\"description\":\"\"},"
-                        + 
"{\"id\":1,\"name\":\"_tinyint1\",\"type\":\"TINYINT\",\"description\":\"\"}]");
-
-        try (Statement statement = getStatement()) {
-            statement.execute("USE " + DATABASE_NAME);
-            statement.executeUpdate("INSERT INTO test_tinyint1_convert VALUES 
(1, 21), (2, 42)");
-
-            FileStoreTable table = getFileStoreTable();
-            RowType rowType =
-                    RowType.of(
-                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.TINYINT()},
-                            new String[] {"pk", "_tinyint1"});
-            List<String> expected = Arrays.asList("+I[1, 21]", "+I[2, 42]");
-            waitForResult(expected, table, rowType, 
Collections.singletonList("pk"));
-
-            // test schema evolution
-            statement.executeUpdate(
-                    "ALTER TABLE test_tinyint1_convert ADD COLUMN 
_new_tinyint1 TINYINT(1)");
-            statement.executeUpdate(
-                    "INSERT INTO test_tinyint1_convert VALUES (3, 63, 1), (4, 
127, -128)");
-
-            rowType =
-                    RowType.of(
-                            new DataType[] {
-                                DataTypes.INT().notNull(), 
DataTypes.TINYINT(), DataTypes.TINYINT()
-                            },
-                            new String[] {"pk", "_tinyint1", "_new_tinyint1"});
-            waitForResult(
-                    Arrays.asList(
-                            "+I[1, 21, NULL]",
-                            "+I[2, 42, NULL]",
-                            "+I[3, 63, 1]",
-                            "+I[4, 127, -128]"),
-                    table,
-                    rowType,
-                    Collections.singletonList("pk"));
-        }
-    }
-
     @Test
     @Timeout(60)
     public void testSyncShards() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index 7f91e9df6..5232f4771 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -46,17 +46,6 @@ CREATE TABLE t3 (
     v1 INT
 );
 
--- test tinyint(1) convert
-CREATE DATABASE paimon_sync_database_tinyint;
-USE paimon_sync_database_tinyint;
-
-CREATE TABLE t4 (
-    pk INT,
-    _datetime DATETIME,
-    _tinyint1 TINYINT(1),
-    PRIMARY KEY (pk)
-);
-
 -- to make sure we use JDBC Driver correctly
 CREATE DATABASE paimon_sync_database1;
 USE paimon_sync_database1;
@@ -322,22 +311,6 @@ CREATE TABLE a (
     PRIMARY KEY (k)
 );
 
-CREATE DATABASE paimon_sync_database_tinyint_schema;
-USE paimon_sync_database_tinyint_schema;
-
-CREATE TABLE schema_evolution_4 (
-    _id INT comment  '_id',
-    v1 VARCHAR(10) comment  'v1',
-    PRIMARY KEY (_id)
-);
-
-CREATE TABLE schema_evolution_5 (
-    _id INT comment  '_id',
-    v1 VARCHAR(10) comment  'v1',
-    v2 TINYINT(1) comment 'tinyint(1)',
-    PRIMARY KEY (_id)
-);
-
 CREATE DATABASE many_table_sync_test;
 USE many_table_sync_test;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 4e8415ffc..a0b06595e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -280,16 +280,6 @@ CREATE TABLE test_computed_column (
     PRIMARY KEY (pk)
 );
 
--- 
################################################################################
---  testTinyInt1Convert
--- 
################################################################################
-
-CREATE TABLE test_tinyint1_convert (
-    pk INT,
-    _tinyint1 TINYINT(1),
-    PRIMARY KEY (pk)
-);
-
 -- 
################################################################################
 --  testSyncShard
 -- 
################################################################################
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
similarity index 83%
copy from 
paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
copy to 
paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
index 4e8415ffc..baf76b9d5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
@@ -21,40 +21,37 @@
 GRANT ALL PRIVILEGES ON *.* TO 'paimonuser'@'%';
 
 -- 
################################################################################
---  MySqlSyncTableActionITCase
+--  MySqlCdcTypeMappingITCase
 -- 
################################################################################
 
-CREATE DATABASE paimon_sync_table;
-USE paimon_sync_table;
+-- 
################################################################################
+--  testTinyInt1NotBool
+-- 
################################################################################
 
-CREATE TABLE schema_evolution_1 (
-    pt INT comment  'primary',
-    _id INT comment  '_id',
-    v1 VARCHAR(10) comment  'v1',
-    PRIMARY KEY (_id)
-);
+CREATE DATABASE tinyint1_not_bool_test;
+USE tinyint1_not_bool_test;
 
-CREATE TABLE schema_evolution_2 (
-    pt INT comment 'primary',
-    _id INT comment  '_id',
-    v1 VARCHAR(10) comment  'v1',
-    PRIMARY KEY (_id)
+CREATE TABLE t1 (
+    pk INT,
+    _tinyint1 TINYINT(1),
+    PRIMARY KEY (pk)
 );
 
-CREATE TABLE schema_evolution_multiple (
-    _id INT comment 'primary',
-    v1 VARCHAR(10) comment 'v1',
-    v2 INT comment 'v2',
-    v3 VARCHAR(10) comment 'v3',
-    PRIMARY KEY (_id)
-);
+INSERT INTO t1 VALUES (1, 1);
+
+CREATE DATABASE all_to_string_test;
+USE all_to_string_test;
+
+-- 
################################################################################
+--  testReadAllTypes
+-- 
################################################################################
 
 CREATE TABLE all_types_table (
     _id INT,
     pt DECIMAL(2, 1),
     -- BIT
     _bit1 BIT,
-    _bit BIT(64),
+    -- _bit BIT(64), TODO
     -- TINYINT
     _tinyint1 TINYINT(1),
     _boolean BOOLEAN,
@@ -152,11 +149,10 @@ CREATE TABLE all_types_table (
     PRIMARY KEY (_id)
 );
 
-
 INSERT INTO all_types_table VALUES (
     1, 1.1,
     -- BIT
-    1, B'11111000111',
+    1, -- B'11111000111', TODO
     -- TINYINT
     true, true, false, 1, 2, 3,
     -- SMALLINT
@@ -212,7 +208,7 @@ INSERT INTO all_types_table VALUES (
     'a,b'
 ), (
     2, 2.2,
-    NULL, NULL,
+    NULL,
     NULL, NULL, NULL, NULL, NULL, NULL,
     NULL, NULL, NULL,
     NULL, NULL, NULL,
@@ -246,80 +242,27 @@ INSERT INTO all_types_table VALUES (
     NULL
 );
 
-CREATE TABLE incompatible_field_1 (
-    _id INT,
-    v1 DATETIME,
-    PRIMARY KEY (_id)
-);
-
-CREATE TABLE incompatible_field_2 (
-    _id INT,
-    v1 INT,
-    PRIMARY KEY (_id)
-);
-
-CREATE TABLE incompatible_pk_1 (
-    a INT,
-    b BIGINT,
-    c VARCHAR(20),
-    PRIMARY KEY (a, b)
-);
-
-CREATE TABLE incompatible_pk_2 (
-    a INT,
-    b BIGINT,
-    c VARCHAR(20),
-    PRIMARY KEY (a)
-);
-
-CREATE TABLE test_computed_column (
-    pk INT,
-    _date DATE,
-    _datetime DATETIME,
-    _timestamp TIMESTAMP,
-    PRIMARY KEY (pk)
-);
-
 -- 
################################################################################
---  testTinyInt1Convert
+--  testSchemaEvolutionAndNewlyCreatedTable
 -- 
################################################################################
 
-CREATE TABLE test_tinyint1_convert (
+CREATE TABLE schema_evolution_test (
     pk INT,
-    _tinyint1 TINYINT(1),
+    v1 INT,
     PRIMARY KEY (pk)
 );
 
 -- 
################################################################################
---  testSyncShard
+--  testIgnoreNotNull
 -- 
################################################################################
 
-CREATE DATABASE shard_1;
-USE shard_1;
+CREATE DATABASE ignore_not_null_test;
+USE ignore_not_null_test;
 
 CREATE TABLE t1 (
     pk INT,
-    _date VARCHAR(10),
+    v1 VARCHAR(10) NOT NULL,
     PRIMARY KEY (pk)
 );
 
-CREATE TABLE t2 (
-    pk INT,
-    _date VARCHAR(10),
-    PRIMARY KEY (pk)
-);
-
-CREATE DATABASE shard_2;
-USE shard_2;
-
-CREATE TABLE t1 (
-    pk INT,
-    _date VARCHAR(10),
-    PRIMARY KEY (pk)
-);
-
-CREATE TABLE t2 (
-    pk INT,
-    _date VARCHAR(10),
-    PRIMARY KEY (pk)
-);
+INSERT INTO t1 VALUES (1, 'A');


Reply via email to