This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ceecd160 [flink][cdc] add option to decide convert mysql tinyint(1) 
to bool or not (#1476)
3ceecd160 is described below

commit 3ceecd16029fa47a25bbed6d927e0de53505fb53
Author: legendtkl <[email protected]>
AuthorDate: Tue Jul 18 10:06:07 2023 +0800

    [flink][cdc] add option to decide convert mysql tinyint(1) to bool or not 
(#1476)
---
 .../shortcodes/generated/mysql_sync_database.html  |   2 +-
 .../shortcodes/generated/mysql_sync_table.html     |   2 +-
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      | 186 +++++++++++++++++++++
 .../tests/cdc/MySqlTinyIntConvertE2ETest.java      | 105 ++++++++++++
 .../src/test/resources/mysql/setup.sql             |  60 +++++++
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |  30 +++-
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |  23 ++-
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java |  20 ++-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  15 +-
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  13 +-
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  |   6 +-
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |  22 ++-
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 157 +++++++++++++++++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 140 ++++++++++++++++
 .../src/test/resources/mysql/setup.sql             |  44 +++++
 15 files changed, 801 insertions(+), 24 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html 
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index cf6ededc0..8336fe6c4 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -59,7 +59,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--mysql-conf</h5></td>
-        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
+        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations. <br> Furthermore, TINYINT(1) type in 
MySQL would be converted to Boolean b [...]
     </tr>
     <tr>
         <td><h5>--catalog-conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html 
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index 15cb8da72..b9b286f9f 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -51,7 +51,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--mysql-conf</h5></td>
-        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
+        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations. <br> Furthermore, TINYINT(1) type in 
MySQL would be converted to Boolean b [...]
     </tr>
     <tr>
         <td><h5>--catalog-conf</h5></td>
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index 3b6cbad41..c464f1fcf 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -304,6 +304,192 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
         cancelJob(jobId);
     }
 
+    @Test
+    public void testSyncTableWithTinyConvert() throws Exception {
+        String runActionCommand =
+                String.join(
+                        " ",
+                        "bin/flink",
+                        "run",
+                        "-D",
+                        "execution.checkpointing.interval=1s",
+                        "--detached",
+                        "lib/paimon-flink-action.jar",
+                        "mysql-sync-table",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "ts_table",
+                        "--partition-keys",
+                        "pt",
+                        "--primary-keys",
+                        "pt,_id",
+                        "--mysql-conf",
+                        "hostname=mysql-1",
+                        "--mysql-conf",
+                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
+                        "--mysql-conf",
+                        String.format("username='%s'", 
mySqlContainer.getUsername()),
+                        "--mysql-conf",
+                        String.format("password='%s'", 
mySqlContainer.getPassword()),
+                        "--mysql-conf",
+                        "database-name='paimon_sync_table'",
+                        "--mysql-conf",
+                        "table-name='tinyint_schema_evolution_.+'",
+                        "--mysql-conf",
+                        "mysql.converter.tinyint1-to-bool='false'",
+                        "--table-conf",
+                        "bucket=2");
+        Container.ExecResult execResult =
+                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        try (Connection conn = getMySqlConnection();
+                Statement statement = conn.createStatement()) {
+            testSyncTableImplWithTinyConvert(statement);
+        }
+    }
+
+    private void testSyncTableImplWithTinyConvert(Statement statement) throws 
Exception {
+        statement.executeUpdate("USE paimon_sync_table");
+
+        statement.executeUpdate("INSERT INTO tinyint_schema_evolution_1 VALUES 
(1, 1, 11)");
+        statement.executeUpdate(
+                "INSERT INTO tinyint_schema_evolution_2 VALUES (1, 2, 12), (2, 
4, 24)");
+
+        String jobId =
+                runSql(
+                        "INSERT INTO result1 SELECT * FROM ts_table;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result1", "pt INT, _id INT, 
_tinyint1 TINYINT"));
+        checkResult("1, 1, 11", "1, 2, 12", "2, 4, 24");
+        clearCurrentResults();
+        cancelJob(jobId);
+
+        statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_1 ADD 
COLUMN v1 TINYINT(1)");
+        statement.executeUpdate(
+                "INSERT INTO tinyint_schema_evolution_1 VALUES (2, 3, 23, 30), 
(1, 5, 15, 50)");
+        statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_2 ADD 
COLUMN v1 TINYINT(1)");
+        statement.executeUpdate("INSERT INTO tinyint_schema_evolution_2 VALUES 
(1, 6, 16, 60)");
+
+        jobId =
+                runSql(
+                        "INSERT INTO result2 SELECT * FROM ts_table;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink(
+                                "result2", "pt INT, _id INT, _tinyint1 
TINYINT, v1 TINYINT"));
+        checkResult(
+                "1, 1, 11, null",
+                "1, 2, 12, null",
+                "2, 3, 23, 30",
+                "2, 4, 24, null",
+                "1, 5, 15, 50",
+                "1, 6, 16, 60");
+        clearCurrentResults();
+        cancelJob(jobId);
+    }
+
+    @Test
+    public void testSyncDatabaseWithTinyConvert() throws Exception {
+        String runActionCommand =
+                String.join(
+                        " ",
+                        "bin/flink",
+                        "run",
+                        "-D",
+                        "execution.checkpointing.interval=1s",
+                        "--detached",
+                        "lib/paimon-flink-action.jar",
+                        "mysql-sync-database",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--mysql-conf",
+                        "hostname=mysql-1",
+                        "--mysql-conf",
+                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
+                        "--mysql-conf",
+                        String.format("username='%s'", 
mySqlContainer.getUsername()),
+                        "--mysql-conf",
+                        String.format("password='%s'", 
mySqlContainer.getPassword()),
+                        "--mysql-conf",
+                        "database-name='paimon_sync_database_tinyint'",
+                        "--mysql-conf",
+                        "mysql.converter.tinyint1-to-bool='false'",
+                        "--table-conf",
+                        "bucket=2");
+        jobManager.execInContainer("su", "flink", "-c", runActionCommand);
+
+        try (Connection conn = getMySqlConnection();
+                Statement statement = conn.createStatement()) {
+            testSyncDatabaseImplWithTinyConvert(statement);
+        }
+    }
+
+    private void testSyncDatabaseImplWithTinyConvert(Statement statement) 
throws Exception {
+        statement.executeUpdate("USE paimon_sync_database_tinyint");
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 10)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20)");
+
+        String jobId =
+                runSql(
+                        "INSERT INTO result1 SELECT * FROM t1;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result1", "k INT, v INT"));
+        checkResult("1, 10");
+        clearCurrentResults();
+        cancelJob(jobId);
+
+        jobId =
+                runSql(
+                        "INSERT INTO result2 SELECT * FROM t2;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result2", "k1 INT, k2 VARCHAR(10), 
v1 INT"));
+        checkResult("2, two, 20");
+        clearCurrentResults();
+        cancelJob(jobId);
+
+        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 TINYINT(1)");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 30, 42)");
+        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v2 TINYINT(1)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 40)");
+
+        jobId =
+                runSql(
+                        "INSERT INTO result3 SELECT * FROM t1;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result3", "k INT, v INT, v1 
TINYINT"));
+        checkResult("1, 10, null", "3, 30, 42");
+        clearCurrentResults();
+        cancelJob(jobId);
+
+        jobId =
+                runSql(
+                        "INSERT INTO result4 SELECT * FROM t2;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result4", "k1 INT, k2 VARCHAR(10), 
v1 INT, v2 TINYINT"));
+        checkResult("2, two, 20, null", "4, four, 40, 40");
+        clearCurrentResults();
+        cancelJob(jobId);
+    }
+
     protected Connection getMySqlConnection() throws Exception {
         return DriverManager.getConnection(
                 String.format(
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
new file mode 100644
index 000000000..3ff120293
--- /dev/null
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+/** E2e test for MySql CDC type convert tinyint(1) to tinyint. */
+public class MySqlTinyIntConvertE2ETest extends MySqlCdcE2eTestBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlTinyIntConvertE2ETest.class);
+
+    protected MySqlTinyIntConvertE2ETest() {
+        super(MySqlVersion.V5_7);
+    }
+
+    @Test
+    public void testSyncTable() throws Exception {
+        String runActionCommand =
+                String.join(
+                        " ",
+                        "bin/flink",
+                        "run",
+                        "-D",
+                        "execution.checkpointing.interval=1s",
+                        "--detached",
+                        "lib/paimon-flink-action.jar",
+                        "mysql-sync-table",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "tinyint_table",
+                        "--primary-keys",
+                        "pk",
+                        "--mysql-conf",
+                        "hostname=mysql-1",
+                        "--mysql-conf",
+                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
+                        "--mysql-conf",
+                        String.format("username='%s'", 
mySqlContainer.getUsername()),
+                        "--mysql-conf",
+                        String.format("password='%s'", 
mySqlContainer.getPassword()),
+                        "--mysql-conf",
+                        "database-name='test_tinyint_convert'",
+                        "--mysql-conf",
+                        "table-name='T'",
+                        "--mysql-conf",
+                        "mysql.converter.tinyint1-to-bool='false'",
+                        "--table-conf",
+                        "bucket=2");
+        Container.ExecResult execResult =
+                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        try (Connection conn = getMySqlConnection();
+                Statement statement = conn.createStatement()) {
+            statement.executeUpdate("USE test_tinyint_convert");
+
+            statement.executeUpdate("INSERT INTO T VALUES (1, '2023-05-10 
12:30:20', 21)");
+
+            String jobId =
+                    runSql(
+                            "INSERT INTO result1 SELECT * FROM tinyint_table",
+                            catalogDdl,
+                            useCatalogCmd,
+                            createResultSink(
+                                    "result1", "pk INT, _date TIMESTAMP(0), 
_tinyint1 TINYINT"));
+            checkResult("1, 2023-05-10T12:30:20, 21");
+            clearCurrentResults();
+            cancelJob(jobId);
+        }
+    }
+
+    @Disabled("Not supported")
+    @Test
+    public void testSyncDatabase() {}
+}
diff --git a/paimon-e2e-tests/src/test/resources/mysql/setup.sql 
b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
index cd5ae49c3..e7443cdc7 100644
--- a/paimon-e2e-tests/src/test/resources/mysql/setup.sql
+++ b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
@@ -41,6 +41,20 @@ CREATE TABLE schema_evolution_2 (
     PRIMARY KEY (_id)
 );
 
+CREATE TABLE tinyint_schema_evolution_1 (
+    pt INT,
+    _id INT,
+    _tinyint1 TINYINT(1),
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE tinyint_schema_evolution_2 (
+    pt INT,
+    _id INT,
+    _tinyint1 TINYINT(1),
+    PRIMARY KEY (_id)
+);
+
 -- 
################################################################################
 --  MySqlCdcE2eTestBase#testSyncDatabase
 -- 
################################################################################
@@ -61,6 +75,22 @@ CREATE TABLE t2 (
     PRIMARY KEY (k1, k2)
 );
 
+CREATE DATABASE paimon_sync_database_tinyint;
+USE paimon_sync_database_tinyint;
+
+CREATE TABLE t1 (
+    k INT,
+    v INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    PRIMARY KEY (k1, k2)
+);
+
 -- to make sure we use JDBC Driver correctly
 CREATE DATABASE paimon_sync_database1;
 USE paimon_sync_database1;
@@ -90,3 +120,33 @@ CREATE TABLE T (
     _datetime DATETIME,
     PRIMARY KEY (pk)
 );
+
+-- 
################################################################################
+--  MySqlTinyIntConvertE2ETest#testSyncTable
+-- 
################################################################################
+
+CREATE DATABASE test_tinyint_convert;
+USE test_tinyint_convert;
+
+CREATE TABLE T (
+    pk INT,
+    _datetime DATETIME,
+    _tinyint1 TINYINT(1),
+    PRIMARY KEY (pk)
+);
+
+CREATE DATABASE paimon_sync_database_tinyint_schema;
+USE paimon_sync_database_tinyint_schema;
+
+CREATE TABLE schema_evolution_4 (
+    _id INT comment  '_id',
+    v1 VARCHAR(10) comment  'v1',
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_5 (
+    _id INT comment  '_id',
+    v1 VARCHAR(10) comment  'v1',
+    v2 TINYINT(1) comment 'tinyint(1)',
+    PRIMARY KEY (_id)
+);
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 15b157ab5..756b94279 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -56,7 +56,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-class MySqlActionUtils {
+/** Utils for MySQL Action. * */
+public class MySqlActionUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlActionUtils.class);
     public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
@@ -66,12 +67,33 @@ class MySqlActionUtils {
                     .withDescription(
                             "Whether capture the scan the newly added tables 
or not, by default is true.");
 
+    public static final ConfigOption<Boolean> MYSQL_CONVERTER_TINYINT1_BOOL =
+            ConfigOptions.key("mysql.converter.tinyint1-to-bool")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Mysql tinyint type will be converted to boolean 
type by default, if you want to convert to tinyint type, "
+                                    + "you can set this option to false.");
+
     static Connection getConnection(Configuration mySqlConfig) throws 
Exception {
-        return DriverManager.getConnection(
+        String url =
                 String.format(
-                        "jdbc:mysql://%s:%d/",
+                        "jdbc:mysql://%s:%d",
                         mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
-                        mySqlConfig.get(MySqlSourceOptions.PORT)),
+                        mySqlConfig.get(MySqlSourceOptions.PORT));
+
+        // we need to add the `tinyInt1isBit` parameter to the connection url 
to make sure the
+        // tinyint(1) in MySQL is converted to bits or not. Refer to
+        // 
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-result-sets.html#cj-conn-prop_tinyInt1isBit
+        if (mySqlConfig.contains(MYSQL_CONVERTER_TINYINT1_BOOL)) {
+            url =
+                    String.format(
+                            "%s?tinyInt1isBit=%s",
+                            url, 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
+        }
+
+        return DriverManager.getConnection(
+                url,
                 mySqlConfig.get(MySqlSourceOptions.USERNAME),
                 mySqlConfig.get(MySqlSourceOptions.PASSWORD));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 71de9e336..e66ab96ae 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -67,34 +67,42 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final ZoneId serverTimeZone;
     private final boolean caseSensitive;
+
     private final TableNameConverter tableNameConverter;
     private final List<ComputedColumn> computedColumns;
     private final NewTableSchemaBuilder<String> schemaBuilder;
+    private final boolean convertTinyint1ToBool;
 
     private JsonNode root;
     private JsonNode payload;
 
     public MySqlDebeziumJsonEventParser(
-            ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn> 
computedColumns) {
+            ZoneId serverTimeZone,
+            boolean caseSensitive,
+            List<ComputedColumn> computedColumns,
+            boolean convertTinyint1ToBool) {
         this(
                 serverTimeZone,
                 caseSensitive,
                 computedColumns,
                 new TableNameConverter(caseSensitive),
-                ddl -> Optional.empty());
+                ddl -> Optional.empty(),
+                convertTinyint1ToBool);
     }
 
     public MySqlDebeziumJsonEventParser(
             ZoneId serverTimeZone,
             boolean caseSensitive,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<String> schemaBuilder) {
+            NewTableSchemaBuilder<String> schemaBuilder,
+            boolean convertTinyint1ToBool) {
         this(
                 serverTimeZone,
                 caseSensitive,
                 Collections.emptyList(),
                 tableNameConverter,
-                schemaBuilder);
+                schemaBuilder,
+                convertTinyint1ToBool);
     }
 
     public MySqlDebeziumJsonEventParser(
@@ -102,12 +110,14 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<String> schemaBuilder) {
+            NewTableSchemaBuilder<String> schemaBuilder,
+            boolean convertTinyint1ToBool) {
         this.serverTimeZone = serverTimeZone;
         this.caseSensitive = caseSensitive;
         this.computedColumns = computedColumns;
         this.tableNameConverter = tableNameConverter;
         this.schemaBuilder = schemaBuilder;
+        this.convertTinyint1ToBool = convertTinyint1ToBool;
     }
 
     @Override
@@ -168,7 +178,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                     MySqlTypeUtils.toDataType(
                             column.get("typeName").asText(),
                             length == null ? null : length.asInt(),
-                            scale == null ? null : scale.asInt());
+                            scale == null ? null : scale.asInt(),
+                            convertTinyint1ToBool);
             if (column.get("optional").asBoolean()) {
                 type = type.nullable();
             } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 55ffeea1b..a60733e10 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -40,7 +40,22 @@ public class MySqlSchema {
     private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
     private final List<String> primaryKeys;
 
-    public MySqlSchema(DatabaseMetaData metaData, String databaseName, String 
tableName)
+    public MySqlSchema(
+            String databaseName,
+            String tableName,
+            LinkedHashMap<String, Tuple2<DataType, String>> fields,
+            List<String> primaryKeys) {
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.fields = fields;
+        this.primaryKeys = primaryKeys;
+    }
+
+    public MySqlSchema(
+            DatabaseMetaData metaData,
+            String databaseName,
+            String tableName,
+            boolean convertTinyintToBool)
             throws Exception {
         this.databaseName = databaseName;
         this.tableName = tableName;
@@ -63,7 +78,8 @@ public class MySqlSchema {
                 fields.put(
                         fieldName,
                         Tuple2.of(
-                                MySqlTypeUtils.toDataType(fieldType, 
precision, scale),
+                                MySqlTypeUtils.toDataType(
+                                        fieldType, precision, scale, 
convertTinyintToBool),
                                 fieldComment));
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 74df7cbac..3901d5c5e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
 import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -237,10 +238,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
         MySqlTableSchemaBuilder schemaBuilder =
                 new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
+        Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
         EventParser.Factory<String> parserFactory =
                 () ->
                         new MySqlDebeziumJsonEventParser(
-                                zoneId, caseSensitive, tableNameConverter, 
schemaBuilder);
+                                zoneId,
+                                caseSensitive,
+                                tableNameConverter,
+                                schemaBuilder,
+                                convertTinyint1ToBool);
 
         String database = this.database;
         DatabaseSyncMode mode = this.mode;
@@ -294,7 +300,12 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         excludedTables.add(tableName);
                         continue;
                     }
-                    MySqlSchema mySqlSchema = new MySqlSchema(metaData, 
databaseName, tableName);
+                    MySqlSchema mySqlSchema =
+                            new MySqlSchema(
+                                    metaData,
+                                    databaseName,
+                                    tableName,
+                                    
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
                     if (mySqlSchema.primaryKeys().size() > 0) {
                         // only tables with primary keys will be considered
                         mySqlSchemaList.add(mySqlSchema);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index b4e154086..fef0bdb72 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -46,6 +46,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -175,8 +176,11 @@ public class MySqlSyncTableAction extends ActionBase {
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
+        Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
         EventParser.Factory<String> parserFactory =
-                () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive, 
computedColumns);
+                () ->
+                        new MySqlDebeziumJsonEventParser(
+                                zoneId, caseSensitive, computedColumns, 
convertTinyint1ToBool);
 
         CdcSinkBuilder<String> sinkBuilder =
                 new CdcSinkBuilder<String>()
@@ -239,7 +243,12 @@ public class MySqlSyncTableAction extends ActionBase {
                                 Matcher tableMatcher = 
tablePattern.matcher(tableName);
                                 if (tableMatcher.matches()) {
                                     mySqlSchemaList.add(
-                                            new MySqlSchema(metaData, 
databaseName, tableName));
+                                            new MySqlSchema(
+                                                    metaData,
+                                                    databaseName,
+                                                    tableName,
+                                                    mySqlConfig.get(
+                                                            
MYSQL_CONVERTER_TINYINT1_BOOL)));
                                 }
                             }
                         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index fb6a47cbc..ee89a5b3d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
@@ -104,7 +105,10 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<String> {
                         name.getSimpleName(),
                         Tuple2.of(
                                 MySqlTypeUtils.toDataType(
-                                        column.getDataType().getName(), 
precision, scale),
+                                        column.getDataType().getName(),
+                                        precision,
+                                        scale,
+                                        
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()),
                                 comment == null ? null : 
String.valueOf(comment.getValue())));
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 71fef4cd8..bd288cee5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -45,6 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
+
 /** Converts from MySQL type to {@link DataType}. */
 public class MySqlTypeUtils {
 
@@ -139,11 +141,15 @@ public class MySqlTypeUtils {
         return toDataType(
                 MySqlTypeUtils.getShortType(mysqlType),
                 MySqlTypeUtils.getPrecision(mysqlType),
-                MySqlTypeUtils.getScale(mysqlType));
+                MySqlTypeUtils.getScale(mysqlType),
+                MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue());
     }
 
     public static DataType toDataType(
-            String type, @Nullable Integer length, @Nullable Integer scale) {
+            String type,
+            @Nullable Integer length,
+            @Nullable Integer scale,
+            Boolean tinyInt1ToBool) {
         switch (type.toUpperCase()) {
             case BIT:
             case BOOLEAN:
@@ -152,9 +158,15 @@ public class MySqlTypeUtils {
             case TINYINT:
                 // MySQL haven't boolean type, it uses tinyint(1) to 
represents boolean type
                 // user should not use tinyint(1) to store number although 
jdbc url parameter
-                // tinyInt1isBit=false can help change the return value, it's 
not a general way
-                // btw: mybatis and mysql-connector-java map tinyint(1) to 
boolean by default
-                return length != null && length == 1 ? DataTypes.BOOLEAN() : 
DataTypes.TINYINT();
+                // tinyInt1isBit=false can help change the return value, it's 
not a general way.
+                // mybatis and mysql-connector-java map tinyint(1) to boolean 
by default, we behave
+                // the same way by default. To store number (-128~127), we can 
set the parameter
+                // tinyInt1ToByte (option 'mysql.converter.tinyint1-to-bool') 
to false, then
+                // tinyint(1)
+                // will be mapped to TinyInt.
+                return length != null && length == 1 && tinyInt1ToBool
+                        ? DataTypes.BOOLEAN()
+                        : DataTypes.TINYINT();
             case TINYINT_UNSIGNED:
             case TINYINT_UNSIGNED_ZEROFILL:
             case SMALLINT:
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 07aaebfcc..635f2053a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -69,6 +69,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
 
     private static final String DATABASE_NAME = "paimon_sync_database";
+
+    private static final String DATABASE_NAME_TINYINT_CONVERT =
+            "paimon_sync_database_tinyint_schema";
+
+    private static final String DATABASE_NAME_TINYINT = 
"paimon_sync_database_tinyint";
     @TempDir java.nio.file.Path tempDir;
 
     @Test
@@ -219,6 +224,103 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table2, rowType2, primaryKeys2);
     }
 
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionWithTinyInt1Convert() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME_TINYINT_CONVERT);
+        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testSchemaEvolutionImplWithTinyInt1Convert(statement);
+            }
+        }
+    }
+
+    private void testSchemaEvolutionImplWithTinyInt1Convert(Statement 
statement) throws Exception {
+        FileStoreTable table1 = getFileStoreTable("schema_evolution_4");
+        FileStoreTable table2 = getFileStoreTable("schema_evolution_5");
+
+        statement.executeUpdate("USE " + DATABASE_NAME_TINYINT_CONVERT);
+
+        statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (1, 
'one')");
+        statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (2, 
'two', 21)");
+        statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (3, 
'three')");
+        statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (4, 
'four', 24)");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"_id", "v1"});
+
+        List<String> primaryKeys1 = Arrays.asList("_id");
+        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.TINYINT()
+                        },
+                        new String[] {"_id", "v1", "v2"});
+        List<String> primaryKeys2 = Arrays.asList("_id");
+        expected = Arrays.asList("+I[2, two, 21]", "+I[4, four, 24]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_4 ADD COLUMN v2 
TINYINT(1)");
+        statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (5, 
'five', 42)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_5 ADD COLUMN v3 
TINYINT(1)");
+        statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (6, 
'six', 42, 43)");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.TINYINT()
+                        },
+                        new String[] {"_id", "v1", "v2"});
+
+        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", 
"+I[5, five, 42]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.TINYINT(),
+                            DataTypes.TINYINT()
+                        },
+                        new String[] {"_id", "v1", "v2", "v3"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, 21, NULL]", "+I[4, four, 24, NULL]", 
"+I[6, six, 42, 43]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+
     @Test
     public void testSpecifiedMySqlTable() {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
@@ -937,6 +1039,61 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         return env.executeAsync();
     }
 
+    @Test
+    @Timeout(60)
+    public void testTinyInt1Convert() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME_TINYINT);
+        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testTinyInt1Convert(statement);
+            }
+        }
+    }
+
+    private void testTinyInt1Convert(Statement statement) throws Exception {
+        FileStoreTable table = getFileStoreTable("t4");
+
+        statement.executeUpdate("USE paimon_sync_database_tinyint");
+
+        statement.executeUpdate("INSERT INTO t4 VALUES (1, '2021-09-15 
15:00:10', 21)");
+        statement.executeUpdate("INSERT INTO t4 VALUES (2, '2023-03-23 
16:00:20', 42)");
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.TIMESTAMP(0), 
DataTypes.TINYINT()
+                        },
+                        new String[] {"pk", "_datetime", "_tinyint1"});
+        List<String> expected =
+                Arrays.asList("+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 
2023-03-23T16:00:20, 42]");
+        waitForResult(expected, table, rowType, Arrays.asList("pk"));
+    }
+
     private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 8a608e674..033c0043b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -881,6 +881,146 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         }
     }
 
+    @Test
+    @Timeout(60)
+    public void testTinyInt1Convert() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "test_tinyint1_convert");
+        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.singletonList("pk"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = conn.createStatement()) {
+            statement.execute("USE paimon_sync_table");
+            statement.executeUpdate(
+                    "INSERT INTO test_tinyint1_convert VALUES (1, '2021-09-15 
15:00:10', 21)");
+            statement.executeUpdate(
+                    "INSERT INTO test_tinyint1_convert VALUES (2, '2023-03-23 
16:00:20', 42)");
+
+            FileStoreTable table = getFileStoreTable();
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.TIMESTAMP(0),
+                                DataTypes.TINYINT()
+                            },
+                            new String[] {"pk", "_datetime", "_tinyint1"});
+            List<String> expected =
+                    Arrays.asList(
+                            "+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 
2023-03-23T16:00:20, 42]");
+            waitForResult(expected, table, rowType, Arrays.asList("pk"));
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionWithTinyint1Convert() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "paimon_sync_table_tinyint");
+        mySqlConfig.put("table-name", "schema_evolution_3");
+        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "_id"),
+                        Collections.singletonMap(
+                                CatalogOptions.METASTORE.key(), 
"test-alter-table"),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        checkTableSchema(
+                "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT 
NOT 
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testSchemaEvolutionImplWithTinyIntConvert(statement);
+            }
+        }
+    }
+
+    private void testSchemaEvolutionImplWithTinyIntConvert(Statement 
statement) throws Exception {
+        FileStoreTable table = getFileStoreTable();
+        statement.executeUpdate("USE paimon_sync_table_tinyint");
+
+        statement.executeUpdate("INSERT INTO schema_evolution_3 VALUES (1, 1, 
'one')");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_3 VALUES (1, 2, 'two'), (2, 4, 
'four')");
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"pt", "_id", "v1"});
+        List<String> primaryKeys = Arrays.asList("pt", "_id");
+        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, 
two]", "+I[2, 4, four]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_3 ADD COLUMN v2 
TINYINT(1)");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_3 VALUES (2, 3, 'three', 30), 
(1, 5, 'five', 50)");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.TINYINT()
+                        },
+                        new String[] {"pt", "_id", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, 1, one, NULL]",
+                        "+I[1, 2, two, NULL]",
+                        "+I[2, 3, three, 30]",
+                        "+I[2, 4, four, NULL]",
+                        "+I[1, 5, five, 50]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
     private FileStoreTable getFileStoreTable() throws Exception {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index b4c877c40..5d9459273 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -274,6 +274,13 @@ CREATE TABLE test_computed_column (
     PRIMARY KEY (pk)
 );
 
+CREATE TABLE test_tinyint1_convert (
+    pk INT,
+    _datetime DATETIME,
+    _tinyint1 TINYINT(1),
+    PRIMARY KEY (pk)
+);
+
 -- 
################################################################################
 --  MySqlSyncDatabaseActionITCase
 -- 
################################################################################
@@ -300,6 +307,17 @@ CREATE TABLE t3 (
     v1 INT
 );
 
+-- test tinyint(1) convert
+CREATE DATABASE paimon_sync_database_tinyint;
+USE paimon_sync_database_tinyint;
+
+CREATE TABLE t4 (
+    pk INT,
+    _datetime DATETIME,
+    _tinyint1 TINYINT(1),
+    PRIMARY KEY (pk)
+);
+
 -- to make sure we use JDBC Driver correctly
 CREATE DATABASE paimon_sync_database1;
 USE paimon_sync_database1;
@@ -549,3 +567,29 @@ CREATE TABLE t2 (
     v2 BIGINT,
     PRIMARY KEY (k1, k2)
 );
+
+CREATE DATABASE paimon_sync_table_tinyint;
+USE paimon_sync_table_tinyint;
+
+CREATE TABLE schema_evolution_3 (
+    pt INT comment  'primary',
+    _id INT comment  '_id',
+    v1 VARCHAR(10) comment  'v1',
+    PRIMARY KEY (_id)
+);
+
+CREATE DATABASE paimon_sync_database_tinyint_schema;
+USE paimon_sync_database_tinyint_schema;
+
+CREATE TABLE schema_evolution_4 (
+    _id INT comment  '_id',
+    v1 VARCHAR(10) comment  'v1',
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_5 (
+    _id INT comment  '_id',
+    v1 VARCHAR(10) comment  'v1',
+    v2 TINYINT(1) comment 'tinyint(1)',
+    PRIMARY KEY (_id)
+);
\ No newline at end of file

Reply via email to