This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f5695bf67 [flink] Introduce MySqlSyncDatabaseAction (#904)
f5695bf67 is described below

commit f5695bf671499b4334823b4413f106d34e3f2898
Author: tsreaper <[email protected]>
AuthorDate: Fri Apr 14 14:44:26 2023 +0800

    [flink] Introduce MySqlSyncDatabaseAction (#904)
---
 docs/content/how-to/cdc-ingestion.md               |  55 ++++
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      | 118 +++++++-
 .../src/test/resources/mysql/setup.sql             |  36 ++-
 .../org/apache/paimon/flink/action/Action.java     |   4 +
 .../flink/action/cdc/mysql/MySqlActionUtils.java   | 204 ++++++++++++++
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java | 112 ++++++++
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 297 +++++++++++++++++++++
 .../action/cdc/mysql/MySqlSyncTableAction.java     | 241 +----------------
 .../action/cdc/mysql/MySqlActionITCaseBase.java    | 128 +++++++++
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 254 ++++++++++++++++++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 104 +-------
 .../src/test/resources/mysql/setup.sql             |  42 ++-
 12 files changed, 1234 insertions(+), 361 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index fcd96ee46..eab0c3437 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -94,3 +94,58 @@ Example
     --paimon-conf changelog-producer=input \
     --paimon-conf sink.parallelism=4
 ```
+
+### Synchronizing Databases
+
+By using 
[MySqlSyncDatabaseAction](/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction)
 in a Flink DataStream job or directly through `flink run`, users can 
synchronize the whole MySQL database into one Paimon database.
+
+To use this feature through `flink run`, run the following shell command.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.paimon.flink.action.FlinkActions \
+    /path/to/paimon-flink-**-{{< version >}}.jar \
+    mysql-sync-database
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] \
+    [--paimon-conf <paimon-table-sink-conf> [--paimon-conf 
<paimon-table-sink-conf> ...]]
+```
+
+* `--warehouse` is the path to Paimon warehouse.
+* `--database` is the database name in Paimon catalog.
+* `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format `key=value`. `hostname`, 
`username`, `password` and `database-name` are required configurations, others 
are optional. Note that `database-name` should be the exact name of the MySQL 
databse you want to synchronize. It can't be a regular expression. See its 
[document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto
 [...]
+* `--paimon-conf` is the configuration for Paimon table sink. Each 
configuration should be specified in the format `key=value`. All Paimon sink 
table will be applied the same set of configurations. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of configurations.
+
+For each MySQL table to be synchronized, if the corresponding Paimon table 
does not exist, this action will automatically create the table. Its schema 
will be derived from all specified MySQL tables. If the Paimon table already 
exists, its schema will be compared against the schema of all specified MySQL 
tables.
+
+This action supports a limited number of schema changes. Unsupported schema 
changes will be ignored. Currently supported schema changes includes:
+
+* Adding columns.
+
+* Altering column types. More specifically,
+
+  * altering from a string type (char, varchar, text) to another string type 
with longer length,
+  * altering from a binary type (binary, varbinary, blob) to another binary 
type with longer length,
+  * altering from an integer type (tinyint, smallint, int, bigint) to another 
integer type with wider range,
+  * altering from a floating-point type (float, double) to another 
floating-point type with wider range,
+  
+  are supported. Other type changes will cause exceptions.
+
+Example
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.paimon.flink.action.FlinkActions \
+    /path/to/paimon-flink-**-{{< version >}}.jar \
+    mysql-sync-database \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --mysql-conf hostname=127.0.0.1 \
+    --mysql-conf username=root \
+    --mysql-conf password=123456 \
+    --mysql-conf database-name=source_db \
+    --paimon-conf bucket=4 \
+    --paimon-conf changelog-producer=input \
+    --paimon-conf sink.parallelism=4
+```
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index a0bede8a8..b5cae94fd 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -37,14 +37,16 @@ import java.sql.Statement;
 import java.util.UUID;
 import java.util.stream.Stream;
 
-/** E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction}. */
+/**
+ * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} and {@link
+ * org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseAction}.
+ */
 public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlCdcE2eTestBase.class);
 
     private static final String USER = "paimonuser";
     private static final String PASSWORD = "paimonpw";
-    private static final String DATABASE_NAME = "paimon_test";
 
     private final MySqlVersion mySqlVersion;
     private MySqlContainer mySqlContainer;
@@ -82,7 +84,6 @@ public abstract class MySqlCdcE2eTestBase extends E2eTestBase 
{
                         .withSetupSQL("mysql/setup.sql")
                         .withUsername(USER)
                         .withPassword(PASSWORD)
-                        .withDatabaseName(DATABASE_NAME)
                         .withLogConsumer(new Slf4jLogConsumer(LOG))
                         // connect with docker-compose.yaml
                         .withNetwork(network)
@@ -119,8 +120,6 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
                         "pt",
                         "--primary-keys",
                         "pt,_id",
-                        "--sink-parallelism",
-                        "2",
                         "--mysql-conf",
                         "hostname=mysql-1",
                         "--mysql-conf",
@@ -130,7 +129,7 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
                         "--mysql-conf",
                         String.format("password='%s'", 
mySqlContainer.getPassword()),
                         "--mysql-conf",
-                        String.format("database-name='%s'", DATABASE_NAME),
+                        "database-name='paimon_sync_table'",
                         "--mysql-conf",
                         "table-name='schema_evolution_.+'",
                         "--paimon-conf",
@@ -154,7 +153,7 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
     }
 
     private void testSyncTableImpl(Statement statement) throws Exception {
-        statement.executeUpdate("USE paimon_test");
+        statement.executeUpdate("USE paimon_sync_table");
 
         statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 
'one')");
         statement.executeUpdate(
@@ -169,9 +168,7 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
                         createResultSink("result1", "pt INT, _id INT, v1 
VARCHAR(10)"));
         checkResult("1, 1, one", "1, 2, two", "2, 4, four");
         clearCurrentResults();
-        Container.ExecResult execResult = 
jobManager.execInContainer("bin/flink", "cancel", jobId);
-        LOG.info(execResult.getStdout());
-        LOG.info(execResult.getStderr());
+        jobManager.execInContainer("bin/flink", "cancel", jobId);
 
         statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 
INT");
         statement.executeUpdate(
@@ -224,6 +221,107 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
         jobManager.execInContainer("bin/flink", "cancel", jobId);
     }
 
+    @Test
+    public void testSyncDatabase() throws Exception {
+        String runActionCommand =
+                String.join(
+                        " ",
+                        "bin/flink",
+                        "run",
+                        "-c",
+                        "org.apache.paimon.flink.action.FlinkActions",
+                        "-D",
+                        "execution.checkpointing.interval=1s",
+                        "--detached",
+                        "lib/paimon-flink.jar",
+                        "mysql-sync-database",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--mysql-conf",
+                        "hostname=mysql-1",
+                        "--mysql-conf",
+                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
+                        "--mysql-conf",
+                        String.format("username='%s'", 
mySqlContainer.getUsername()),
+                        "--mysql-conf",
+                        String.format("password='%s'", 
mySqlContainer.getPassword()),
+                        "--mysql-conf",
+                        "database-name='paimon_sync_database'",
+                        "--paimon-conf",
+                        "bucket=2");
+        jobManager.execInContainer("su", "flink", "-c", runActionCommand);
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        String.format(
+                                "jdbc:mysql://%s:%s/",
+                                mySqlContainer.getHost(), 
mySqlContainer.getDatabasePort()),
+                        mySqlContainer.getUsername(),
+                        mySqlContainer.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testSyncDatabaseImpl(statement);
+            }
+        }
+    }
+
+    private void testSyncDatabaseImpl(Statement statement) throws Exception {
+        statement.executeUpdate("USE paimon_sync_database");
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 10)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20)");
+
+        String jobId =
+                runSql(
+                        "INSERT INTO result1 SELECT * FROM t1;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result1", "k INT, v INT"));
+        checkResult("1, 10");
+        clearCurrentResults();
+        jobManager.execInContainer("bin/flink", "cancel", jobId);
+
+        jobId =
+                runSql(
+                        "INSERT INTO result2 SELECT * FROM t2;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result2", "k1 INT, k2 VARCHAR(10), 
v1 INT"));
+        checkResult("2, two, 20");
+        clearCurrentResults();
+        jobManager.execInContainer("bin/flink", "cancel", jobId);
+
+        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v BIGINT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 3000000000000)");
+        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v2 DOUBLE");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 40.5)");
+
+        jobId =
+                runSql(
+                        "INSERT INTO result3 SELECT * FROM t1;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result3", "k INT, v BIGINT"));
+        checkResult("1, 10", "3, 3000000000000");
+        clearCurrentResults();
+        jobManager.execInContainer("bin/flink", "cancel", jobId);
+
+        jobId =
+                runSql(
+                        "INSERT INTO result4 SELECT * FROM t2;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result4", "k1 INT, k2 VARCHAR(10), 
v1 INT, v2 DOUBLE"));
+        checkResult("2, two, 20, null", "4, four, 40, 40.5");
+        clearCurrentResults();
+        jobManager.execInContainer("bin/flink", "cancel", jobId);
+    }
+
     private String runSql(String sql, String... ddls) throws Exception {
         return runSql(String.join("\n", ddls) + "\n" + sql);
     }
diff --git a/paimon-e2e-tests/src/test/resources/mysql/setup.sql 
b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
index 1477afa49..d0016666f 100644
--- a/paimon-e2e-tests/src/test/resources/mysql/setup.sql
+++ b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
@@ -16,16 +16,16 @@
 
 -- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
 -- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
--- However, in this database we'll grant 2 users different privileges:
+-- However, in this database we'll grant the test user 'paimonuser' all 
privileges:
 --
--- 1) 'paimonuser' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
--- 2) 'mysqluser' - all privileges
---
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
LOCK TABLES  ON *.* TO 'paimonuser'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
-GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+GRANT ALL PRIVILEGES ON *.* TO 'paimonuser'@'%';
+
+-- 
################################################################################
+--  MySqlCdcE2eTestBase#testSyncTable
+-- 
################################################################################
 
-USE paimon_test;
+CREATE DATABASE paimon_sync_table;
+USE paimon_sync_table;
 
 CREATE TABLE schema_evolution_1 (
     pt INT,
@@ -40,3 +40,23 @@ CREATE TABLE schema_evolution_2 (
     v1 VARCHAR(10),
     PRIMARY KEY (_id)
 );
+
+-- 
################################################################################
+--  MySqlCdcE2eTestBase#testSyncDatabase
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database;
+USE paimon_sync_database;
+
+CREATE TABLE t1 (
+    k INT,
+    v INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    PRIMARY KEY (k1, k2)
+);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index 8ef2d391e..2474fa64b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.catalog.CatalogUtils;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseAction;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction;
 
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -119,6 +120,7 @@ public interface Action {
         private static final String MERGE_INTO = "merge-into";
         // cdc actions
         private static final String MYSQL_SYNC_TABLE = "mysql-sync-table";
+        private static final String MYSQL_SYNC_DATABASE = 
"mysql-sync-database";
 
         public static Optional<Action> create(String[] args) {
             String action = args[0].toLowerCase();
@@ -135,6 +137,8 @@ public interface Action {
                     return MergeIntoAction.create(actionArgs);
                 case MYSQL_SYNC_TABLE:
                     return MySqlSyncTableAction.create(actionArgs);
+                case MYSQL_SYNC_DATABASE:
+                    return MySqlSyncDatabaseAction.create(actionArgs);
                 default:
                     System.err.println("Unknown action \"" + action + "\"");
                     printHelp();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
new file mode 100644
index 000000000..11aca6b06
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+class MySqlActionUtils {
+
+    static Connection getConnection(Configuration mySqlConfig) throws 
Exception {
+        return DriverManager.getConnection(
+                String.format(
+                        "jdbc:mysql://%s:%d/",
+                        mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
+                        mySqlConfig.get(MySqlSourceOptions.PORT)),
+                mySqlConfig.get(MySqlSourceOptions.USERNAME),
+                mySqlConfig.get(MySqlSourceOptions.PASSWORD));
+    }
+
+    static void assertSchemaCompatible(TableSchema tableSchema, MySqlSchema 
mySqlSchema) {
+        if (!schemaCompatible(tableSchema, mySqlSchema)) {
+            throw new IllegalArgumentException(
+                    "Paimon schema and MySQL schema are not compatible.\n"
+                            + "Paimon fields are: "
+                            + tableSchema.fields()
+                            + ".\nMySQL fields are: "
+                            + mySqlSchema.fields());
+        }
+    }
+
+    static boolean schemaCompatible(TableSchema tableSchema, MySqlSchema 
mySqlSchema) {
+        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields().entrySet()) {
+            int idx = tableSchema.fieldNames().indexOf(entry.getKey());
+            if (idx < 0) {
+                return false;
+            }
+            DataType type = tableSchema.fields().get(idx).type();
+            if (!SchemaChangeProcessFunction.canConvert(entry.getValue(), 
type)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    static Schema buildPaimonSchema(
+            MySqlSchema mySqlSchema,
+            List<String> specifiedPartitionKeys,
+            List<String> specifiedPrimaryKeys,
+            Map<String, String> paimonConfig) {
+        Schema.Builder builder = Schema.newBuilder();
+        builder.options(paimonConfig);
+
+        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields().entrySet()) {
+            builder.column(entry.getKey(), entry.getValue());
+        }
+
+        if (specifiedPrimaryKeys.size() > 0) {
+            for (String key : specifiedPrimaryKeys) {
+                if (!mySqlSchema.fields().containsKey(key)) {
+                    throw new IllegalArgumentException(
+                            "Specified primary key " + key + " does not exist 
in MySQL tables");
+                }
+            }
+            builder.primaryKey(specifiedPrimaryKeys);
+        } else if (mySqlSchema.primaryKeys().size() > 0) {
+            builder.primaryKey(mySqlSchema.primaryKeys());
+        } else {
+            throw new IllegalArgumentException(
+                    "Primary keys are not specified. "
+                            + "Also, can't infer primary keys from MySQL table 
schemas because "
+                            + "MySQL tables have no primary keys or have 
different primary keys.");
+        }
+
+        if (specifiedPartitionKeys.size() > 0) {
+            builder.partitionKeys(specifiedPartitionKeys);
+        }
+
+        return builder.build();
+    }
+
+    static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig) {
+        MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
+
+        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+        String tableName = mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
+        sourceBuilder
+                .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
+                .port(mySqlConfig.get(MySqlSourceOptions.PORT))
+                .username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
+                .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
+                .databaseList(databaseName)
+                .tableList(databaseName + "." + tableName);
+
+        
mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
+                .ifPresent(sourceBuilder::serverTimeZone);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
+                .ifPresent(sourceBuilder::fetchSize);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
+                .ifPresent(sourceBuilder::connectTimeout);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
+                .ifPresent(sourceBuilder::connectMaxRetries);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
+                .ifPresent(sourceBuilder::connectionPoolSize);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
+                .ifPresent(sourceBuilder::heartbeatInterval);
+
+        String startupMode = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
+        // see
+        // 
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.initial());
+        } else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.earliest());
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.latest());
+        } else if ("specific-offset".equalsIgnoreCase(startupMode)) {
+            BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+            String file = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+            Long pos = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+            if (file != null && pos != null) {
+                offsetBuilder.setBinlogFilePosition(file, pos);
+            }
+            mySqlConfig
+                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
+                    .ifPresent(offsetBuilder::setGtidSet);
+            mySqlConfig
+                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+                    .ifPresent(offsetBuilder::setSkipEvents);
+            mySqlConfig
+                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+                    .ifPresent(offsetBuilder::setSkipRows);
+            
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
+        } else if ("timestamp".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(
+                    StartupOptions.timestamp(
+                            
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+        }
+
+        Properties jdbcProperties = new Properties();
+        Properties debeziumProperties = new Properties();
+        for (Map.Entry<String, String> entry : mySqlConfig.toMap().entrySet()) 
{
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
+                
jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), 
value);
+            } else if 
(key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+                debeziumProperties.put(
+                        
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+            }
+        }
+        sourceBuilder.jdbcProperties(jdbcProperties);
+        sourceBuilder.debeziumProperties(debeziumProperties);
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(true, 
customConverterConfigs);
+        return 
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
new file mode 100644
index 000000000..fc06a1efb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.types.DataType;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utility class to load MySQL table schema with JDBC. */
+public class MySqlSchema {
+
+    private final String databaseName;
+    private final String tableName;
+
+    private final Map<String, DataType> fields;
+    private final List<String> primaryKeys;
+
+    public MySqlSchema(DatabaseMetaData metaData, String databaseName, String 
tableName)
+            throws Exception {
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+
+        fields = new LinkedHashMap<>();
+        try (ResultSet rs = metaData.getColumns(null, databaseName, tableName, 
null)) {
+            while (rs.next()) {
+                String fieldName = rs.getString("COLUMN_NAME");
+                String fieldType = rs.getString("TYPE_NAME");
+                Integer precision = rs.getInt("COLUMN_SIZE");
+                if (rs.wasNull()) {
+                    precision = null;
+                }
+                Integer scale = rs.getInt("DECIMAL_DIGITS");
+                if (rs.wasNull()) {
+                    scale = null;
+                }
+                fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType, 
precision, scale));
+            }
+        }
+
+        primaryKeys = new ArrayList<>();
+        try (ResultSet rs = metaData.getPrimaryKeys(null, databaseName, 
tableName)) {
+            while (rs.next()) {
+                String fieldName = rs.getString("COLUMN_NAME");
+                primaryKeys.add(fieldName);
+            }
+        }
+    }
+
+    public String tableName() {
+        return tableName;
+    }
+
+    public Map<String, DataType> fields() {
+        return fields;
+    }
+
+    public List<String> primaryKeys() {
+        return primaryKeys;
+    }
+
+    public MySqlSchema merge(MySqlSchema other) {
+        for (Map.Entry<String, DataType> entry : other.fields.entrySet()) {
+            String fieldName = entry.getKey();
+            DataType newType = entry.getValue();
+            if (fields.containsKey(fieldName)) {
+                DataType oldType = fields.get(fieldName);
+                if (SchemaChangeProcessFunction.canConvert(oldType, newType)) {
+                    fields.put(fieldName, newType);
+                } else if (SchemaChangeProcessFunction.canConvert(newType, 
oldType)) {
+                    // nothing to do
+                } else {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Column %s have different types in table 
%s.%s and table %s.%s",
+                                    fieldName,
+                                    databaseName,
+                                    tableName,
+                                    other.databaseName,
+                                    other.tableName));
+                }
+            } else {
+                fields.put(fieldName, newType);
+            }
+        }
+        if (!primaryKeys.equals(other.primaryKeys)) {
+            primaryKeys.clear();
+        }
+        return this;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
new file mode 100644
index 000000000..214ace94d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link Action} which synchronize the whole MySQL database into one 
Paimon database.
+ *
+ * <p>You should specify MySQL source database in {@code mySqlConfig}. See <a
+ * 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document
+ * of flink-cdc-connectors</a> for detailed keys and values.
+ *
+ * <p>For each MySQL table to be synchronized, if the corresponding Paimon 
table does not exist,
+ * this action will automatically create the table. Its schema will be derived 
from all specified
+ * MySQL tables. If the Paimon table already exists, its schema will be 
compared against the schema
+ * of all specified MySQL tables.
+ *
+ * <p>This action supports a limited number of schema changes. Unsupported 
schema changes will be
+ * ignored. Currently supported schema changes includes:
+ *
+ * <ul>
+ *   <li>Adding columns.
+ *   <li>Altering column types. More specifically,
+ *       <ul>
+ *         <li>altering from a string type (char, varchar, text) to another 
string type with longer
+ *             length,
+ *         <li>altering from a binary type (binary, varbinary, blob) to 
another binary type with
+ *             longer length,
+ *         <li>altering from an integer type (tinyint, smallint, int, bigint) 
to another integer
+ *             type with wider range,
+ *         <li>altering from a floating-point type (float, double) to another 
floating-point type
+ *             with wider range,
+ *       </ul>
+ *       are supported. Other type changes will cause exceptions.
+ * </ul>
+ *
+ * <p>This action creates a Paimon table sink for each Paimon table to be 
written, so this action is
+ * not very efficient in resource saving. We may optimize this action by 
merging all sinks into one
+ * instance in the future.
+ */
+public class MySqlSyncDatabaseAction implements Action {
+
+    private final Configuration mySqlConfig;
+    private final String warehouse;
+    private final String database;
+    private final Map<String, String> paimonConfig;
+
+    MySqlSyncDatabaseAction(
+            Map<String, String> mySqlConfig,
+            String warehouse,
+            String database,
+            Map<String, String> paimonConfig) {
+        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+        this.warehouse = warehouse;
+        this.database = database;
+        this.paimonConfig = paimonConfig;
+    }
+
+    public void build(StreamExecutionEnvironment env) throws Exception {
+        Preconditions.checkArgument(
+                !mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
+                MySqlSourceOptions.TABLE_NAME.key()
+                        + " cannot be set for mysql-sync-database. "
+                        + "If you want to sync several MySQL tables into one 
Paimon table, "
+                        + "use mysql-sync-table instead.");
+        List<MySqlSchema> mySqlSchemas = getMySqlSchemaList();
+        Preconditions.checkArgument(
+                mySqlSchemas.size() > 0,
+                "No tables found in MySQL database "
+                        + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
+                        + ", or MySQL database does not exist.");
+
+        mySqlConfig.set(
+                MySqlSourceOptions.TABLE_NAME,
+                "("
+                        + mySqlSchemas.stream()
+                                .map(MySqlSchema::tableName)
+                                .collect(Collectors.joining("|"))
+                        + ")");
+        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig);
+
+        Catalog catalog =
+                CatalogFactory.createCatalog(
+                        CatalogContext.create(
+                                new Options().set(CatalogOptions.WAREHOUSE, 
warehouse)));
+        catalog.createDatabase(database, true);
+
+        List<FileStoreTable> fileStoreTables = new ArrayList<>();
+        for (MySqlSchema mySqlSchema : mySqlSchemas) {
+            Identifier identifier = new Identifier(database, 
mySqlSchema.tableName());
+            FileStoreTable table;
+            try {
+                table = (FileStoreTable) catalog.getTable(identifier);
+                MySqlActionUtils.assertSchemaCompatible(table.schema(), 
mySqlSchema);
+            } catch (Catalog.TableNotExistException e) {
+                Schema schema =
+                        MySqlActionUtils.buildPaimonSchema(
+                                mySqlSchema,
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                paimonConfig);
+                catalog.createTable(identifier, schema, false);
+                table = (FileStoreTable) catalog.getTable(identifier);
+            }
+            fileStoreTables.add(table);
+        }
+
+        EventParser.Factory<String> parserFactory;
+        String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
+        if (serverTimeZone != null) {
+            parserFactory = () -> new 
MySqlDebeziumJsonEventParser(ZoneId.of(serverTimeZone));
+        } else {
+            parserFactory = MySqlDebeziumJsonEventParser::new;
+        }
+
+        FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
+                new FlinkCdcSyncDatabaseSinkBuilder<String>()
+                        .withInput(
+                                env.fromSource(
+                                        source, 
WatermarkStrategy.noWatermarks(), "MySQL Source"))
+                        .withParserFactory(parserFactory)
+                        .withTables(fileStoreTables);
+        String sinkParallelism = 
paimonConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        if (sinkParallelism != null) {
+            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
+        }
+        sinkBuilder.build();
+    }
+
+    private List<MySqlSchema> getMySqlSchemaList() throws Exception {
+        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+        List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
+        try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet tables =
+                    metaData.getTables(databaseName, null, "%", new String[] 
{"TABLE"})) {
+                while (tables.next()) {
+                    String tableName = tables.getString("TABLE_NAME");
+                    MySqlSchema mySqlSchema = new MySqlSchema(metaData, 
databaseName, tableName);
+                    if (mySqlSchema.primaryKeys().size() > 0) {
+                        // only tables with primary keys will be considered
+                        mySqlSchemaList.add(mySqlSchema);
+                    }
+                }
+            }
+        }
+        return mySqlSchemaList;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    public static Optional<Action> create(String[] args) {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        String warehouse = params.get("warehouse");
+        String database = params.get("database");
+
+        Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
+        Map<String, String> paimonConfig = getConfigMap(params, "paimon-conf");
+        if (mySqlConfig == null || paimonConfig == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new MySqlSyncDatabaseAction(mySqlConfig, warehouse, database, 
paimonConfig));
+    }
+
+    private static Map<String, String> getConfigMap(MultipleParameterTool 
params, String key) {
+        Map<String, String> map = new HashMap<>();
+
+        for (String param : params.getMultiParameter(key)) {
+            String[] kv = param.split("=");
+            if (kv.length == 2) {
+                map.put(kv[0], kv[1]);
+                continue;
+            }
+
+            System.err.println(
+                    "Invalid " + key + " " + param + ".\nRun 
mysql-sync-database --help for help.");
+            return null;
+        }
+        return map;
+    }
+
+    private static void printHelp() {
+        System.out.println(
+                "Action \"mysql-sync-database\" creates a streaming job "
+                        + "with a Flink MySQL CDC source and multiple Paimon 
table sinks "
+                        + "to synchronize a whole MySQL database into one 
Paimon database.\n"
+                        + "Only MySQL tables with primary keys will be 
considered. "
+                        + "Newly created MySQL tables after the job starts 
will not be included.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  mysql-sync-database --warehouse <warehouse-path> --database 
<database-name> "
+                        + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] "
+                        + "[--paimon-conf <paimon-table-sink-conf> 
[--paimon-conf <paimon-table-sink-conf> ...]]");
+        System.out.println();
+
+        System.out.println("MySQL CDC source conf syntax:");
+        System.out.println("  key=value");
+        System.out.println(
+                "'hostname', 'username', 'password' and 'database-name' "
+                        + "are required configurations, others are optional. "
+                        + "Note that 'database-name' should be the exact name "
+                        + "of the MySQL databse you want to synchronize. "
+                        + "It can't be a regular expression.");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";);
+        System.out.println();
+
+        System.out.println("Paimon table sink conf syntax:");
+        System.out.println("  key=value");
+        System.out.println("All Paimon sink table will be applied the same set 
of configurations.");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://paimon.apache.org/docs/master/maintenance/configurations/";);
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  mysql-sync-database \\\n"
+                        + "    --warehouse hdfs:///path/to/warehouse \\\n"
+                        + "    --database test_db \\\n"
+                        + "    --mysql-conf hostname=127.0.0.1 \\\n"
+                        + "    --mysql-conf username=root \\\n"
+                        + "    --mysql-conf password=123456 \\\n"
+                        + "    --mysql-conf database-name=source_db \\\n"
+                        + "    --paimon-conf bucket=4 \\\n"
+                        + "    --paimon-conf changelog-producer=input \\\n"
+                        + "    --paimon-conf sink.parallelism=4");
+    }
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        build(env);
+        env.execute(String.format("MySQL-Paimon Database Sync: %s", database));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 79e10fda4..6419a3048 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -26,44 +26,30 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncTableSinkBuilder;
-import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataType;
 
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
-import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
-import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
-import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
-import com.ververica.cdc.connectors.mysql.table.StartupOptions;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.kafka.connect.json.JsonConverterConfig;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -126,7 +112,7 @@ public class MySqlSyncTableAction implements Action {
     }
 
     public void build(StreamExecutionEnvironment env) throws Exception {
-        MySqlSource<String> source = buildSource();
+        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig);
         MySqlSchema mySqlSchema =
                 getMySqlSchemaList().stream()
                         .reduce(MySqlSchema::merge)
@@ -145,16 +131,11 @@ public class MySqlSyncTableAction implements Action {
         FileStoreTable table;
         try {
             table = (FileStoreTable) catalog.getTable(identifier);
-            if (!schemaCompatible(table.schema(), mySqlSchema)) {
-                throw new IllegalArgumentException(
-                        "Paimon schema and MySQL schema are not compatible.\n"
-                                + "Paimon fields are: "
-                                + table.schema().fields()
-                                + ".\nMySQL fields are: "
-                                + mySqlSchema.fields);
-            }
+            MySqlActionUtils.assertSchemaCompatible(table.schema(), 
mySqlSchema);
         } catch (Catalog.TableNotExistException e) {
-            Schema schema = buildSchema(mySqlSchema);
+            Schema schema =
+                    MySqlActionUtils.buildPaimonSchema(
+                            mySqlSchema, partitionKeys, primaryKeys, 
paimonConfig);
             catalog.createTable(identifier, schema, false);
             table = (FileStoreTable) catalog.getTable(identifier);
         }
@@ -181,106 +162,12 @@ public class MySqlSyncTableAction implements Action {
         sinkBuilder.build();
     }
 
-    private MySqlSource<String> buildSource() {
-        MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
-
-        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
-        String tableName = mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
-        sourceBuilder
-                .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
-                .port(mySqlConfig.get(MySqlSourceOptions.PORT))
-                .username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
-                .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
-                .databaseList(databaseName)
-                .tableList(databaseName + "." + tableName);
-
-        
mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
-        mySqlConfig
-                .getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
-                .ifPresent(sourceBuilder::serverTimeZone);
-        mySqlConfig
-                .getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
-                .ifPresent(sourceBuilder::fetchSize);
-        mySqlConfig
-                .getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
-                .ifPresent(sourceBuilder::connectTimeout);
-        mySqlConfig
-                .getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
-                .ifPresent(sourceBuilder::connectMaxRetries);
-        mySqlConfig
-                .getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
-                .ifPresent(sourceBuilder::connectionPoolSize);
-        mySqlConfig
-                .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
-                .ifPresent(sourceBuilder::heartbeatInterval);
-
-        String startupMode = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
-        // see
-        // 
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
-        if ("initial".equalsIgnoreCase(startupMode)) {
-            sourceBuilder.startupOptions(StartupOptions.initial());
-        } else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
-            sourceBuilder.startupOptions(StartupOptions.earliest());
-        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
-            sourceBuilder.startupOptions(StartupOptions.latest());
-        } else if ("specific-offset".equalsIgnoreCase(startupMode)) {
-            BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
-            String file = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
-            Long pos = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
-            if (file != null && pos != null) {
-                offsetBuilder.setBinlogFilePosition(file, pos);
-            }
-            mySqlConfig
-                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
-                    .ifPresent(offsetBuilder::setGtidSet);
-            mySqlConfig
-                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
-                    .ifPresent(offsetBuilder::setSkipEvents);
-            mySqlConfig
-                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
-                    .ifPresent(offsetBuilder::setSkipRows);
-            
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
-        } else if ("timestamp".equalsIgnoreCase(startupMode)) {
-            sourceBuilder.startupOptions(
-                    StartupOptions.timestamp(
-                            
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
-        }
-
-        Properties jdbcProperties = new Properties();
-        Properties debeziumProperties = new Properties();
-        for (Map.Entry<String, String> entry : mySqlConfig.toMap().entrySet()) 
{
-            String key = entry.getKey();
-            String value = entry.getValue();
-            if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
-                
jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), 
value);
-            } else if 
(key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
-                debeziumProperties.put(
-                        
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
-            }
-        }
-        sourceBuilder.jdbcProperties(jdbcProperties);
-        sourceBuilder.debeziumProperties(debeziumProperties);
-
-        Map<String, Object> customConverterConfigs = new HashMap<>();
-        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(true, 
customConverterConfigs);
-        return 
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
-    }
-
     private List<MySqlSchema> getMySqlSchemaList() throws Exception {
         Pattern databasePattern =
                 
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
         Pattern tablePattern = 
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
         List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
-        try (Connection conn =
-                DriverManager.getConnection(
-                        String.format(
-                                "jdbc:mysql://%s:%d/",
-                                mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
-                                mySqlConfig.get(MySqlSourceOptions.PORT)),
-                        mySqlConfig.get(MySqlSourceOptions.USERNAME),
-                        mySqlConfig.get(MySqlSourceOptions.PASSWORD))) {
+        try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
             DatabaseMetaData metaData = conn.getMetaData();
             try (ResultSet schemas = metaData.getCatalogs()) {
                 while (schemas.next()) {
@@ -304,122 +191,6 @@ public class MySqlSyncTableAction implements Action {
         return mySqlSchemaList;
     }
 
-    private boolean schemaCompatible(TableSchema tableSchema, MySqlSchema 
mySqlSchema) {
-        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields.entrySet()) {
-            int idx = tableSchema.fieldNames().indexOf(entry.getKey());
-            if (idx < 0) {
-                return false;
-            }
-            DataType type = tableSchema.fields().get(idx).type();
-            if (!SchemaChangeProcessFunction.canConvert(entry.getValue(), 
type)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private Schema buildSchema(MySqlSchema mySqlSchema) {
-        Schema.Builder builder = Schema.newBuilder();
-        builder.options(paimonConfig);
-
-        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields.entrySet()) {
-            builder.column(entry.getKey(), entry.getValue());
-        }
-
-        if (primaryKeys.size() > 0) {
-            for (String key : primaryKeys) {
-                if (!mySqlSchema.fields.containsKey(key)) {
-                    throw new IllegalArgumentException(
-                            "Specified primary key " + key + " does not exist 
in MySQL tables");
-                }
-            }
-            builder.primaryKey(primaryKeys);
-        } else if (mySqlSchema.primaryKeys.size() > 0) {
-            builder.primaryKey(mySqlSchema.primaryKeys);
-        } else {
-            throw new IllegalArgumentException(
-                    "Primary keys are not specified. "
-                            + "Also, can't infer primary keys from MySQL table 
schemas because "
-                            + "MySQL tables have no primary keys or have 
different primary keys.");
-        }
-
-        if (partitionKeys.size() > 0) {
-            builder.partitionKeys(partitionKeys);
-        }
-
-        return builder.build();
-    }
-
-    private static class MySqlSchema {
-
-        private final String databaseName;
-        private final String tableName;
-
-        private final Map<String, DataType> fields;
-        private final List<String> primaryKeys;
-
-        private MySqlSchema(DatabaseMetaData metaData, String databaseName, 
String tableName)
-                throws Exception {
-            this.databaseName = databaseName;
-            this.tableName = tableName;
-
-            fields = new LinkedHashMap<>();
-            try (ResultSet rs = metaData.getColumns(null, databaseName, 
tableName, null)) {
-                while (rs.next()) {
-                    String fieldName = rs.getString("COLUMN_NAME");
-                    String fieldType = rs.getString("TYPE_NAME");
-                    Integer precision = rs.getInt("COLUMN_SIZE");
-                    if (rs.wasNull()) {
-                        precision = null;
-                    }
-                    Integer scale = rs.getInt("DECIMAL_DIGITS");
-                    if (rs.wasNull()) {
-                        scale = null;
-                    }
-                    fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType, 
precision, scale));
-                }
-            }
-
-            primaryKeys = new ArrayList<>();
-            try (ResultSet rs = metaData.getPrimaryKeys(null, databaseName, 
tableName)) {
-                while (rs.next()) {
-                    String fieldName = rs.getString("COLUMN_NAME");
-                    primaryKeys.add(fieldName);
-                }
-            }
-        }
-
-        private MySqlSchema merge(MySqlSchema other) {
-            for (Map.Entry<String, DataType> entry : other.fields.entrySet()) {
-                String fieldName = entry.getKey();
-                DataType newType = entry.getValue();
-                if (fields.containsKey(fieldName)) {
-                    DataType oldType = fields.get(fieldName);
-                    if (SchemaChangeProcessFunction.canConvert(oldType, 
newType)) {
-                        fields.put(fieldName, newType);
-                    } else if (SchemaChangeProcessFunction.canConvert(newType, 
oldType)) {
-                        // nothing to do
-                    } else {
-                        throw new IllegalArgumentException(
-                                String.format(
-                                        "Column %s have different types in 
table %s.%s and table %s.%s",
-                                        fieldName,
-                                        databaseName,
-                                        tableName,
-                                        other.databaseName,
-                                        other.tableName));
-                    }
-                } else {
-                    fields.put(fieldName, newType);
-                }
-            }
-            if (!primaryKeys.equals(other.primaryKeys)) {
-                primaryKeys.clear();
-            }
-            return this;
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
new file mode 100644
index 000000000..cff787528
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for {@link org.apache.paimon.flink.action.Action}s related 
to MySQL. */
+public class MySqlActionITCaseBase extends ActionITCaseBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlActionITCaseBase.class);
+
+    protected static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V5_7);
+    private static final String USER = "paimonuser";
+    private static final String PASSWORD = "paimonpw";
+
+    @BeforeAll
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @AfterAll
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        MYSQL_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return (MySqlContainer)
+                new MySqlContainer(version)
+                        .withConfigurationOverride("mysql/my.cnf")
+                        .withSetupSQL("mysql/setup.sql")
+                        .withUsername(USER)
+                        .withPassword(PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    protected void waitForResult(
+            List<String> expected, FileStoreTable table, RowType rowType, 
List<String> primaryKeys)
+            throws Exception {
+        assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
+
+        // wait for table schema to become our expected schema
+        while (true) {
+            int cnt = 0;
+            for (int i = 0; i < table.schema().fields().size(); i++) {
+                DataField field = table.schema().fields().get(i);
+                boolean sameName = 
field.name().equals(rowType.getFieldNames().get(i));
+                boolean sameType = 
field.type().equals(rowType.getFieldTypes().get(i));
+                if (sameName && sameType) {
+                    cnt++;
+                }
+            }
+            if (cnt == rowType.getFieldCount()) {
+                break;
+            }
+            table = table.copyWithLatestSchema();
+            Thread.sleep(1000);
+        }
+
+        // wait for data to become expected
+        List<String> sortedExpected = new ArrayList<>(expected);
+        Collections.sort(sortedExpected);
+        while (true) {
+            TableScan.Plan plan = table.newScan().plan();
+            List<String> result =
+                    getResult(
+                            table.newRead(),
+                            plan == null ? Collections.emptyList() : 
plan.splits(),
+                            rowType);
+            List<String> sortedActual = new ArrayList<>(result);
+            Collections.sort(sortedActual);
+            if (sortedExpected.equals(sortedActual)) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+    }
+
+    protected Map<String, String> getBasicMySqlConfig() {
+        Map<String, String> config = new HashMap<>();
+        config.put("hostname", MYSQL_CONTAINER.getHost());
+        config.put("port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        config.put("username", USER);
+        config.put("password", PASSWORD);
+        config.put("server-time-zone", ZoneId.of("+00:00").toString());
+        return config;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
new file mode 100644
index 000000000..78dbbfb8e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** IT cases for {@link MySqlSyncDatabaseAction}. */
+public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
+
+    private static final String DATABASE_NAME = "paimon_sync_database";
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolution() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> paimonConfig = new HashMap<>();
+        paimonConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        paimonConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) 
+ 1));
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(mySqlConfig, warehouse, database, 
paimonConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testSchemaEvolutionImpl(statement);
+            }
+        }
+    }
+
+    private void testSchemaEvolutionImpl(Statement statement) throws Exception 
{
+        FileStoreTable table1 = getFileStoreTable("t1");
+        FileStoreTable table2 = getFileStoreTable("t2");
+
+        statement.executeUpdate("USE paimon_sync_database");
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
+        statement.executeUpdate("INSERT INTO t3 VALUES (-1)");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        List<String> primaryKeys1 = Collections.singletonList("k");
+        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "k2", "v1", "v2"});
+        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
+        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 
400]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 INT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
+        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v3 VARCHAR(10)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 60, 600, 
'string_6')");
+        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 80, 800, 
'string_8')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.INT()
+                        },
+                        new String[] {"k", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"k1", "k2", "v1", "v2", "v3"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, 20, 200, NULL]",
+                        "+I[4, four, 40, 400, NULL]",
+                        "+I[6, six, 60, 600, string_6]",
+                        "+I[8, eight, 80, 800, string_8]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v2 BIGINT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 
9000000000000)");
+        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v3 VARCHAR(20)");
+        statement.executeUpdate(
+                "INSERT INTO t2 VALUES (10, 'ten', 100, 1000, 
'long_long_string_10')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.BIGINT()
+                        },
+                        new String[] {"k", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]",
+                        "+I[9, nine, 9000000000000]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.VARCHAR(20)
+                        },
+                        new String[] {"k1", "k2", "v1", "v2", "v3"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, 20, 200, NULL]",
+                        "+I[4, four, 40, 400, NULL]",
+                        "+I[6, six, 60, 600, string_6]",
+                        "+I[8, eight, 80, 800, string_8]",
+                        "+I[10, ten, 100, 1000, long_long_string_10]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+
+    @Test
+    public void testSpecifiedMySqlTable() {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "my_table");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(mySqlConfig, warehouse, database, 
new HashMap<>());
+
+        IllegalArgumentException e =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> action.build(env),
+                        "Expecting IllegalArgumentException");
+        assertThat(e)
+                .hasMessage(
+                        "table-name cannot be set for mysql-sync-database. "
+                                + "If you want to sync several MySQL tables 
into one Paimon table, "
+                                + "use mysql-sync-table instead.");
+    }
+
+    @Test
+    public void testInvalidDatabase() {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "invalid");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(mySqlConfig, warehouse, database, 
new HashMap<>());
+
+        IllegalArgumentException e =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> action.build(env),
+                        "Expecting IllegalArgumentException");
+        assertThat(e)
+                .hasMessage(
+                        "No tables found in MySQL database invalid, or MySQL 
database does not exist.");
+    }
+
+    private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Identifier identifier = Identifier.create(database, tableName);
+        return (FileStoreTable) catalog.getTable(identifier);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9769a1672..9b874bab9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -22,11 +22,8 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -35,65 +32,26 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** IT cases for {@link MySqlSyncTableAction}. */
-public class MySqlSyncTableActionITCase extends ActionITCaseBase {
+public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSyncTableActionITCase.class);
-
-    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V5_7);
-    private static final String USER = "paimonuser";
-    private static final String PASSWORD = "paimonpw";
-    private static final String DATABASE_NAME = "paimon_test";
-
-    @BeforeAll
-    public static void startContainers() {
-        LOG.info("Starting containers...");
-        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
-        LOG.info("Containers are started.");
-    }
-
-    @AfterAll
-    public static void stopContainers() {
-        LOG.info("Stopping containers...");
-        MYSQL_CONTAINER.stop();
-        LOG.info("Containers are stopped.");
-    }
-
-    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
-        return (MySqlContainer)
-                new MySqlContainer(version)
-                        .withConfigurationOverride("mysql/my.cnf")
-                        .withSetupSQL("mysql/setup.sql")
-                        .withUsername(USER)
-                        .withPassword(PASSWORD)
-                        .withDatabaseName(DATABASE_NAME)
-                        .withLogConsumer(new Slf4jLogConsumer(LOG));
-    }
+    private static final String DATABASE_NAME = "paimon_sync_table";
 
     @Test
     @Timeout(60)
@@ -133,9 +91,7 @@ public class MySqlSyncTableActionITCase extends 
ActionITCaseBase {
 
         try (Connection conn =
                 DriverManager.getConnection(
-                        String.format(
-                                "jdbc:mysql://%s:%s/",
-                                MYSQL_CONTAINER.getHost(), 
MYSQL_CONTAINER.getDatabasePort()),
+                        MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
                         MYSQL_CONTAINER.getUsername(),
                         MYSQL_CONTAINER.getPassword())) {
             try (Statement statement = conn.createStatement()) {
@@ -146,7 +102,7 @@ public class MySqlSyncTableActionITCase extends 
ActionITCaseBase {
 
     private void testSchemaEvolutionImpl(Statement statement) throws Exception 
{
         FileStoreTable table = getFileStoreTable();
-        statement.executeUpdate("USE paimon_test");
+        statement.executeUpdate("USE paimon_sync_table");
 
         statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 
'one')");
         statement.executeUpdate(
@@ -573,58 +529,6 @@ public class MySqlSyncTableActionITCase extends 
ActionITCaseBase {
                                 + "MySQL tables have no primary keys or have 
different primary keys.");
     }
 
-    private void waitForResult(
-            List<String> expected, FileStoreTable table, RowType rowType, 
List<String> primaryKeys)
-            throws Exception {
-        assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
-
-        // wait for table schema to become our expected schema
-        while (true) {
-            int cnt = 0;
-            for (int i = 0; i < table.schema().fields().size(); i++) {
-                DataField field = table.schema().fields().get(i);
-                boolean sameName = 
field.name().equals(rowType.getFieldNames().get(i));
-                boolean sameType = 
field.type().equals(rowType.getFieldTypes().get(i));
-                if (sameName && sameType) {
-                    cnt++;
-                }
-            }
-            if (cnt == rowType.getFieldCount()) {
-                break;
-            }
-            table = table.copyWithLatestSchema();
-            Thread.sleep(1000);
-        }
-
-        // wait for data to become expected
-        List<String> sortedExpected = new ArrayList<>(expected);
-        Collections.sort(sortedExpected);
-        while (true) {
-            TableScan.Plan plan = table.newScan().plan();
-            List<String> result =
-                    getResult(
-                            table.newRead(),
-                            plan == null ? Collections.emptyList() : 
plan.splits(),
-                            rowType);
-            List<String> sortedActual = new ArrayList<>(result);
-            Collections.sort(sortedActual);
-            if (sortedExpected.equals(sortedActual)) {
-                break;
-            }
-            Thread.sleep(1000);
-        }
-    }
-
-    private Map<String, String> getBasicMySqlConfig() {
-        Map<String, String> config = new HashMap<>();
-        config.put("hostname", MYSQL_CONTAINER.getHost());
-        config.put("port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
-        config.put("username", USER);
-        config.put("password", PASSWORD);
-        config.put("server-time-zone", ZoneId.of("+00:00").toString());
-        return config;
-    }
-
     private FileStoreTable getFileStoreTable() throws Exception {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 3cc6c9e51..d8d15f0b3 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -16,16 +16,16 @@
 
 -- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
 -- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
--- However, in this database we'll grant 2 users different privileges:
+-- However, in this database we'll grant the test user 'paimonuser' all 
privileges:
 --
--- 1) 'paimonuser' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
--- 2) 'mysqluser' - all privileges
---
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
LOCK TABLES  ON *.* TO 'paimonuser'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
-GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+GRANT ALL PRIVILEGES ON *.* TO 'paimonuser'@'%';
+
+-- 
################################################################################
+--  MySqlSyncTableActionITCase
+-- 
################################################################################
 
-USE paimon_test;
+CREATE DATABASE paimon_sync_table;
+USE paimon_sync_table;
 
 CREATE TABLE schema_evolution_1 (
     pt INT,
@@ -154,3 +154,29 @@ CREATE TABLE incompatible_pk_2 (
     c VARCHAR(20),
     PRIMARY KEY (a)
 );
+
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database;
+USE paimon_sync_database;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+-- no primary key, should be ignored
+CREATE TABLE t3 (
+    v1 INT
+);


Reply via email to