This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf77f183 [flink][mysql-cdc] Support Synchronizing multiple shards 
into one database (#1674)
4bf77f183 is described below

commit 4bf77f18377d4783868723d897eb4ffce44bb1d1
Author: yuzelin <[email protected]>
AuthorDate: Mon Jul 31 16:57:26 2023 +0800

    [flink][mysql-cdc] Support Synchronizing multiple shards into one database 
(#1674)
---
 docs/content/how-to/cdc-ingestion.md               |  31 +++
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |  61 ++++--
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java |  20 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 158 +++++++++------
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  58 ++----
 .../action/cdc/mysql/MySqlActionITCaseBase.java    |   6 +
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 146 +++++++++++++-
 .../mysql/MySqlSyncDatabaseTableListITCase.java    | 215 +++++++++++++++++++++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |   6 +-
 .../test/resources/mysql/sync_database_setup.sql   |  45 +++++
 .../test/resources/mysql/tablelist_test_setup.sql  |  94 +++++++++
 11 files changed, 703 insertions(+), 137 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 57d3e06a4..b16e0e7b6 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -196,6 +196,37 @@ The command to recover from previous snapshot and add new 
tables to synchronize
     --including-tables 'product|user|address|order|custom'
 ```
 
+{{< hint info >}}
+You can set `--mode combined` to enable synchronizing newly added tables 
without restarting job.
+{{< /hint >}}
+
+Example 3: synchronize and merge multiple shards
+
+Let's say you have multiple database shards `db1`, `db2`, ... and each 
database has tables `tbl1`, `tbl2`, .... You can 
+synchronize all the `db.+.tbl.+` into tables `test_db.tbl1`, `test_db.tbl2` 
... by following command:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mysql-sync-database \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --mysql-conf hostname=127.0.0.1 \
+    --mysql-conf username=root \
+    --mysql-conf password=123456 \
+    --mysql-conf database-name='db.+' \
+    --catalog-conf metastore=hive \
+    --catalog-conf uri=thrift://hive-metastore:9083 \
+    --table-conf bucket=4 \
+    --table-conf changelog-producer=input \
+    --table-conf sink.parallelism=4 \
+    --including-tables 'tbl.+'
+```
+
+By setting database-name to a regular expression, the synchronization job will 
capture all tables under matched databases 
+and merge tables of the same name into one table.
+
+
 ## Kafka
 
 ### Prepare Kafka Bundled Jar
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 756b94279..b2a89f03d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
@@ -44,7 +45,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -52,6 +55,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -98,6 +104,44 @@ public class MySqlActionUtils {
                 mySqlConfig.get(MySqlSourceOptions.PASSWORD));
     }
 
+    static List<MySqlSchema> getMySqlSchemaList(
+            Configuration mySqlConfig,
+            Predicate<MySqlSchema> monitorTablePredication,
+            List<Identifier> excludedTables)
+            throws Exception {
+        Pattern databasePattern =
+                
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
+        List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
+        try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet schemas = metaData.getCatalogs()) {
+                while (schemas.next()) {
+                    String databaseName = schemas.getString("TABLE_CAT");
+                    Matcher databaseMatcher = 
databasePattern.matcher(databaseName);
+                    if (databaseMatcher.matches()) {
+                        try (ResultSet tables = 
metaData.getTables(databaseName, null, "%", null)) {
+                            while (tables.next()) {
+                                String tableName = 
tables.getString("TABLE_NAME");
+                                MySqlSchema mySqlSchema =
+                                        new MySqlSchema(
+                                                metaData,
+                                                databaseName,
+                                                tableName,
+                                                
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
+                                if (monitorTablePredication.test(mySqlSchema)) 
{
+                                    mySqlSchemaList.add(mySqlSchema);
+                                } else {
+                                    
excludedTables.add(mySqlSchema.identifier());
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return mySqlSchemaList;
+    }
+
     static void assertSchemaCompatible(TableSchema paimonSchema, Schema 
mySqlSchema) {
         if (!schemaCompatible(paimonSchema, mySqlSchema)) {
             throw new IllegalArgumentException(
@@ -154,8 +198,8 @@ public class MySqlActionUtils {
                 checkArgument(
                         !mySqlFields.containsKey(fieldName.toLowerCase()),
                         String.format(
-                                "Duplicate key '%s' in table '%s.%s' appears 
when converting fields map keys to case-insensitive form.",
-                                fieldName, mySqlSchema.databaseName(), 
mySqlSchema.tableName()));
+                                "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
+                                fieldName, mySqlSchema.identifier()));
                 mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
             }
             mySqlPrimaryKeys =
@@ -199,19 +243,17 @@ public class MySqlActionUtils {
         return builder.build();
     }
 
-    static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig) {
+    static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig, 
String tableList) {
         validateMySqlConfig(mySqlConfig);
         MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
 
-        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
-        String tableName = mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
         sourceBuilder
                 .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
                 .port(mySqlConfig.get(MySqlSourceOptions.PORT))
                 .username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
                 .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
-                .databaseList(databaseName)
-                .tableList(databaseName + "." + tableName);
+                
.databaseList(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME))
+                .tableList(tableList);
 
         
mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
         mySqlConfig
@@ -317,11 +359,6 @@ public class MySqlActionUtils {
                 String.format(
                         "mysql-conf [%s] must be specified.",
                         MySqlSourceOptions.DATABASE_NAME.key()));
-
-        checkArgument(
-                mySqlConfig.get(MySqlSourceOptions.TABLE_NAME) != null,
-                String.format(
-                        "mysql-conf [%s] must be specified.", 
MySqlSourceOptions.TABLE_NAME.key()));
     }
 
     static List<ComputedColumn> buildComputedColumns(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index a60733e10..c88637537 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.types.DataType;
 
@@ -40,17 +41,6 @@ public class MySqlSchema {
     private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
     private final List<String> primaryKeys;
 
-    public MySqlSchema(
-            String databaseName,
-            String tableName,
-            LinkedHashMap<String, Tuple2<DataType, String>> fields,
-            List<String> primaryKeys) {
-        this.databaseName = databaseName;
-        this.tableName = tableName;
-        this.fields = fields;
-        this.primaryKeys = primaryKeys;
-    }
-
     public MySqlSchema(
             DatabaseMetaData metaData,
             String databaseName,
@@ -93,14 +83,14 @@ public class MySqlSchema {
         }
     }
 
-    public String databaseName() {
-        return databaseName;
-    }
-
     public String tableName() {
         return tableName;
     }
 
+    public Identifier identifier() {
+        return Identifier.create(databaseName, tableName);
+    }
+
     public LinkedHashMap<String, Tuple2<DataType, String>> fields() {
         return fields;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index c931fca31..048d9deb2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -42,15 +42,13 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -170,13 +168,18 @@ public class MySqlSyncDatabaseAction extends ActionBase {
             validateCaseInsensitive();
         }
 
-        List<String> excludedTables = new LinkedList<>();
-        List<MySqlSchema> mySqlSchemas = getMySqlSchemaList(excludedTables);
-        String mySqlDatabase = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+        List<Identifier> excludedTables = new ArrayList<>();
+        List<MySqlSchema> beforeMerging =
+                MySqlActionUtils.getMySqlSchemaList(
+                        mySqlConfig, monitorTablePredication(), 
excludedTables);
+        List<Identifier> monitoredTables =
+                
beforeMerging.stream().map(MySqlSchema::identifier).collect(Collectors.toList());
+        List<MySqlSchema> mySqlSchemas = mergeShards(beforeMerging);
+
         checkArgument(
                 mySqlSchemas.size() > 0,
                 "No tables found in MySQL database "
-                        + mySqlDatabase
+                        + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
                         + ", or MySQL database does not exist.");
 
         catalog.createDatabase(database, true);
@@ -184,7 +187,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                 new TableNameConverter(caseSensitive, tablePrefix, 
tableSuffix);
 
         List<FileStoreTable> fileStoreTables = new ArrayList<>();
-        List<String> monitoredTables = new ArrayList<>();
         for (MySqlSchema mySqlSchema : mySqlSchemas) {
             String paimonTableName = 
tableNameConverter.convert(mySqlSchema.tableName());
             Identifier identifier = new Identifier(database, paimonTableName);
@@ -202,13 +204,13 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                 Supplier<String> errMsg =
                         incompatibleMessage(table.schema(), mySqlSchema, 
identifier);
                 if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
-                    monitoredTables.add(mySqlSchema.tableName());
                     fileStoreTables.add(table);
+                } else {
+                    monitoredTables.remove(mySqlSchema.identifier());
                 }
             } catch (Catalog.TableNotExistException e) {
                 catalog.createTable(identifier, fromMySql, false);
                 table = (FileStoreTable) catalog.getTable(identifier);
-                monitoredTables.add(mySqlSchema.tableName());
                 fileStoreTables.add(table);
             }
         }
@@ -217,22 +219,10 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                 !monitoredTables.isEmpty(),
                 "No tables to be synchronized. Possible cause is the schemas 
of all tables in specified "
                         + "MySQL database are not compatible with those of 
existed Paimon tables. Please check the log.");
-        String tableList;
 
-        if (mode == COMBINED) {
-            // First excluding all tables that failed the excludingPattern and 
don't have primary
-            // keys. Then including other tables using regex so that newly 
added table DDLs and DMLs
-            // during job runtime can be captured
-            tableList =
-                    excludedTables.stream()
-                                    .map(t -> String.format("(?!(%s))", t))
-                                    .collect(Collectors.joining(""))
-                            + includingTables;
-        } else {
-            tableList = "(" + String.join("|", monitoredTables) + ")";
-        }
-        mySqlConfig.set(MySqlSourceOptions.TABLE_NAME, tableList);
-        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig);
+        MySqlSource<String> source =
+                MySqlActionUtils.buildMySqlSource(
+                        mySqlConfig, buildTableList(monitoredTables, 
excludedTables));
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
@@ -291,35 +281,17 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         tableSuffix));
     }
 
-    private List<MySqlSchema> getMySqlSchemaList(List<String> excludedTables) 
throws Exception {
-        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
-        List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
-        try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            try (ResultSet tables =
-                    metaData.getTables(databaseName, null, "%", new String[] 
{"TABLE"})) {
-                while (tables.next()) {
-                    String tableName = tables.getString("TABLE_NAME");
-                    if (!shouldMonitorTable(tableName)) {
-                        excludedTables.add(tableName);
-                        continue;
-                    }
-                    MySqlSchema mySqlSchema =
-                            new MySqlSchema(
-                                    metaData,
-                                    databaseName,
-                                    tableName,
-                                    
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
-                    if (mySqlSchema.primaryKeys().size() > 0) {
-                        // only tables with primary keys will be considered
-                        mySqlSchemaList.add(mySqlSchema);
-                    } else {
-                        excludedTables.add(tableName);
-                    }
-                }
+    private Predicate<MySqlSchema> monitorTablePredication() {
+        return schema -> {
+            if (schema.primaryKeys().isEmpty()) {
+                LOG.debug(
+                        "Didn't find primary keys from table '{}'. "
+                                + "This table won't be synchronized.",
+                        schema.identifier());
+                return false;
             }
-        }
-        return mySqlSchemaList;
+            return shouldMonitorTable(schema.tableName());
+        };
     }
 
     private boolean shouldMonitorTable(String mySqlTableName) {
@@ -327,7 +299,9 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         if (excludingPattern != null) {
             shouldMonitor = shouldMonitor && 
!excludingPattern.matcher(mySqlTableName).matches();
         }
-        LOG.debug("Source table {} is monitored? {}", mySqlTableName, 
shouldMonitor);
+        if (!shouldMonitor) {
+            LOG.debug("Source table '{}' is excluded.", mySqlTableName);
+        }
         return shouldMonitor;
     }
 
@@ -351,14 +325,82 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                 String.format(
                         "Incompatible schema found.\n"
                                 + "Paimon table is: %s, fields are: %s.\n"
-                                + "MySQL table is: %s.%s, fields are: %s.\n",
+                                + "MySQL table is: %s, fields are: %s.\n",
                         identifier.getFullName(),
                         paimonSchema.fields(),
-                        mySqlSchema.databaseName(),
-                        mySqlSchema.tableName(),
+                        mySqlSchema.identifier(),
                         mySqlSchema.fields());
     }
 
+    /** Merge schemas for tables that have the same table name. */
+    private List<MySqlSchema> mergeShards(List<MySqlSchema> rawMySqlSchemas) {
+        Map<String, MySqlSchema> schemaMap = new HashMap<>();
+        for (MySqlSchema rawSchema : rawMySqlSchemas) {
+            String tableName = rawSchema.tableName();
+            MySqlSchema schema = schemaMap.get(tableName);
+            if (schema == null) {
+                schemaMap.put(tableName, rawSchema);
+            } else {
+                schemaMap.put(tableName, schema.merge(rawSchema));
+            }
+        }
+        return new ArrayList<>(schemaMap.values());
+    }
+
+    /**
+     * See {@link 
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils#discoverCapturedTables}
+     * and {@code MySqlSyncDatabaseTableListITCase}.
+     */
+    private String buildTableList(
+            List<Identifier> monitoredTables, List<Identifier> excludedTables) 
{
+        String separatorRex = "\\.";
+        if (mode == DIVIDED) {
+            // In DIVIDED mode, we only concern about existed tables
+            return monitoredTables.stream()
+                    .map(t -> t.getDatabaseName() + separatorRex + 
t.getObjectName())
+                    .collect(Collectors.joining("|"));
+        } else if (mode == COMBINED) {
+            // In COMBINED mode, we should consider both existed tables and 
possible newly added
+            // tables, so we should use regular expression to monitor all 
valid tables and exclude
+            // certain invalid tables
+
+            // The table list is build by template:
+            // 
(?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
+
+            // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables 
whose qualified name
+            // is exactly equal to 'db.tbl'
+            // The including pattern 
databasePattern\\.(including_pattern1|...) can include tables
+            // whose qualified name matches one of the patterns
+
+            // a table can be monitored only when its name meets the including 
pattern and doesn't
+            // be excluded by excluding pattern at the same time
+            String includingPattern =
+                    String.format(
+                            "%s%s(%s)",
+                            mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
+                            separatorRex,
+                            includingTables);
+            if (excludedTables.isEmpty()) {
+                return includingPattern;
+            }
+
+            String excludingPattern =
+                    excludedTables.stream()
+                            .map(
+                                    t ->
+                                            String.format(
+                                                    "(^%s$)",
+                                                    t.getDatabaseName()
+                                                            + separatorRex
+                                                            + 
t.getObjectName()))
+                            .collect(Collectors.joining("|"));
+            excludingPattern = "?!" + excludingPattern;
+            return String.format("(%s)(%s)", excludingPattern, 
includingPattern);
+        }
+
+        throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " + 
mode);
+    }
+
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index e85d1121a..433105863 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -35,15 +35,12 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -133,7 +130,16 @@ public class MySqlSyncTableAction extends ActionBase {
     }
 
     public void build(StreamExecutionEnvironment env) throws Exception {
-        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig);
+        checkArgument(
+                mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
+                String.format(
+                        "mysql-conf [%s] must be specified.", 
MySqlSourceOptions.TABLE_NAME.key()));
+
+        String tableList =
+                mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
+                        + "\\."
+                        + mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
+        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
 
         boolean caseSensitive = catalog.caseSensitive();
 
@@ -142,7 +148,9 @@ public class MySqlSyncTableAction extends ActionBase {
         }
 
         MySqlSchema mySqlSchema =
-                getMySqlSchemaList().stream()
+                MySqlActionUtils.getMySqlSchemaList(
+                                mySqlConfig, monitorTablePredication(), new 
ArrayList<>())
+                        .stream()
                         .reduce(MySqlSchema::merge)
                         .orElseThrow(
                                 () ->
@@ -235,38 +243,12 @@ public class MySqlSyncTableAction extends ActionBase {
         }
     }
 
-    private List<MySqlSchema> getMySqlSchemaList() throws Exception {
-        Pattern databasePattern =
-                
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
-        Pattern tablePattern = 
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
-        List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
-        try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
-            DatabaseMetaData metaData = conn.getMetaData();
-            try (ResultSet schemas = metaData.getCatalogs()) {
-                while (schemas.next()) {
-                    String databaseName = schemas.getString("TABLE_CAT");
-                    Matcher databaseMatcher = 
databasePattern.matcher(databaseName);
-                    if (databaseMatcher.matches()) {
-                        try (ResultSet tables = 
metaData.getTables(databaseName, null, "%", null)) {
-                            while (tables.next()) {
-                                String tableName = 
tables.getString("TABLE_NAME");
-                                Matcher tableMatcher = 
tablePattern.matcher(tableName);
-                                if (tableMatcher.matches()) {
-                                    mySqlSchemaList.add(
-                                            new MySqlSchema(
-                                                    metaData,
-                                                    databaseName,
-                                                    tableName,
-                                                    mySqlConfig.get(
-                                                            
MYSQL_CONVERTER_TINYINT1_BOOL)));
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return mySqlSchemaList;
+    private Predicate<MySqlSchema> monitorTablePredication() {
+        return schema -> {
+            Pattern tableNamePattern =
+                    
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
+            return tableNamePattern.matcher(schema.tableName()).matches();
+        };
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index ad536a6f6..b17e4fa12 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -149,4 +150,9 @@ public class MySqlActionITCaseBase extends ActionITCaseBase 
{
             Thread.sleep(1000);
         }
     }
+
+    protected FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
+        Identifier identifier = Identifier.create(database, tableName);
+        return (FileStoreTable) catalog().getTable(identifier);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index b1bcd88bd..6a0878ace 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
@@ -282,7 +283,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
                         new String[] {"_id", "v1"});
 
-        List<String> primaryKeys1 = Arrays.asList("_id");
+        List<String> primaryKeys1 = Collections.singletonList("_id");
         List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
 
         waitForResult(expected, table1, rowType1, primaryKeys1);
@@ -293,7 +294,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.TINYINT()
                         },
                         new String[] {"_id", "v1", "v2"});
-        List<String> primaryKeys2 = Arrays.asList("_id");
+        List<String> primaryKeys2 = Collections.singletonList("_id");
         expected = Arrays.asList("+I[2, two, 21]", "+I[4, four, 24]");
         waitForResult(expected, table2, rowType2, primaryKeys2);
 
@@ -936,7 +937,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         createNewTable(statement, newTableName);
         statement.executeUpdate(
                 String.format("INSERT INTO `%s`.`t2` VALUES (8, 'eight', 80, 
800)", databaseName));
-        List<Tuple2<Integer, String>> newTableRecords = 
getNewTableRecords(newTableCount);
+        List<Tuple2<Integer, String>> newTableRecords = getNewTableRecords();
         recordsMap.put(newTableName, newTableRecords);
         List<String> newTableExpected = getNewTableExpected(newTableRecords);
         insertRecordsIntoNewTable(statement, databaseName, newTableName, 
newTableRecords);
@@ -975,7 +976,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
             Thread.sleep(5000L);
 
             // insert records
-            newTableRecords = getNewTableRecords(newTableCount);
+            newTableRecords = getNewTableRecords();
             recordsMap.put(newTableName, newTableRecords);
             insertRecordsIntoNewTable(statement, databaseName, newTableName, 
newTableRecords);
             newTable = getFileStoreTable(newTableName);
@@ -1039,7 +1040,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                 .collect(Collectors.toList());
     }
 
-    private List<Tuple2<Integer, String>> getNewTableRecords(int 
newTableCount) {
+    private List<Tuple2<Integer, String>> getNewTableRecords() {
         List<Tuple2<Integer, String>> records = new LinkedList<>();
         int count = ThreadLocalRandom.current().nextInt(10) + 1;
         for (int i = 0; i < count; i++) {
@@ -1176,7 +1177,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         new String[] {"pk", "_datetime", "_tinyint1"});
         List<String> expected =
                 Arrays.asList("+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 
2023-03-23T16:00:20, 42]");
-        waitForResult(expected, table, rowType, Arrays.asList("pk"));
+        waitForResult(expected, table, rowType, 
Collections.singletonList("pk"));
     }
 
     @Test
@@ -1257,9 +1258,136 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         }
     }
 
-    private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
-        Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalog().getTable(identifier);
+    @Test
+    @Timeout(60)
+    public void testSyncMultipleShards() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "database_shard_.*");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ? 
DIVIDED : COMBINED;
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        null,
+                        null,
+                        null,
+                        null,
+                        Collections.emptyMap(),
+                        tableConfig,
+                        mode);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = conn.createStatement()) {
+            // test insert into t1
+            statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES 
(1, 'db1_1')");
+            statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES 
(2, 'db1_2')");
+
+            statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES 
(3, 'db2_3', 300)");
+            statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES 
(4, 'db2_4', 400)");
+
+            FileStoreTable table = getFileStoreTable("t1");
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), 
DataTypes.VARCHAR(20), DataTypes.BIGINT()
+                            },
+                            new String[] {"k", "v1", "v2"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, db1_1, NULL]",
+                            "+I[2, db1_2, NULL]",
+                            "+I[3, db2_3, 300]",
+                            "+I[4, db2_4, 400]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test schema evolution of t2
+            statement.executeUpdate("ALTER TABLE database_shard_1.t2 ADD 
COLUMN v2 INT");
+            statement.executeUpdate("ALTER TABLE database_shard_2.t2 ADD 
COLUMN v3 VARCHAR(10)");
+            statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES 
(1, 1.1, 1)");
+            statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES 
(2, 2.2, 2)");
+            statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES 
(3, 3.3, 'db2_3')");
+            statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES 
(4, 4.4, 'db2_4')");
+            table = getFileStoreTable("t2");
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.BIGINT().notNull(),
+                                DataTypes.DOUBLE(),
+                                DataTypes.INT(),
+                                DataTypes.VARCHAR(10)
+                            },
+                            new String[] {"k", "v1", "v2", "v3"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, 1.1, 1, NULL]",
+                            "+I[2, 2.2, 2, NULL]",
+                            "+I[3, 3.3, NULL, db2_3]",
+                            "+I[4, 4.4, NULL, db2_4]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test that database_shard_2.t3 won't be synchronized
+            statement.executeUpdate(
+                    "INSERT INTO database_shard_2.t3 VALUES (1, 'db2_1'), (2, 
'db2_2')");
+            statement.executeUpdate(
+                    "INSERT INTO database_shard_1.t3 VALUES (3, 'db1_3'), (4, 
'db1_4')");
+            table = getFileStoreTable("t3");
+            rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                            new String[] {"k", "v1"});
+            waitForResult(
+                    Arrays.asList("+I[3, db1_3]", "+I[4, db1_4]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test newly added table
+            if (mode == COMBINED) {
+                statement.executeUpdate(
+                        "CREATE TABLE database_shard_1.t4 (k INT, v1 
VARCHAR(10), PRIMARY KEY (k))");
+                statement.executeUpdate("INSERT INTO database_shard_1.t4 
VALUES (1, 'db1_1')");
+
+                statement.executeUpdate(
+                        "CREATE TABLE database_shard_2.t4 (k INT, v1 
VARCHAR(10), PRIMARY KEY (k))");
+                statement.executeUpdate("INSERT INTO database_shard_2.t4 
VALUES (2, 'db2_2')");
+
+                Catalog catalog = catalog();
+                while (!catalog.tableExists(Identifier.create(database, 
"t4"))) {
+                    Thread.sleep(100);
+                }
+
+                table = getFileStoreTable("t4");
+                rowType =
+                        RowType.of(
+                                new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                                new String[] {"k", "v1"});
+                waitForResult(
+                        Arrays.asList("+I[1, db1_1]", "+I[2, db2_2]"),
+                        table,
+                        rowType,
+                        Collections.singletonList("k"));
+            }
+        }
     }
 
     private void assertTableExists(List<String> tableNames) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
new file mode 100644
index 000000000..674c79456
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test if the table list in {@link MySqlSyncDatabaseAction} is correct. */
+public class MySqlSyncDatabaseTableListITCase extends MySqlActionITCaseBase {
+
+    @BeforeAll
+    public static void startContainers() {
+        MYSQL_CONTAINER.withSetupSQL("mysql/tablelist_test_setup.sql");
+        start();
+    }
+
+    // TODO it's more convenient to check table without merging shards
+    @Test
+    @Timeout(60)
+    public void testActionRunResult() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", ".*shard_.*");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ? 
DIVIDED : COMBINED;
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        null,
+                        null,
+                        "t.+|s.+",
+                        "ta|sa",
+                        Collections.emptyMap(),
+                        tableConfig,
+                        mode);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl("shard_1"),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = conn.createStatement()) {
+            Catalog catalog = catalog();
+            assertThat(catalog.listTables(database))
+                    .containsExactlyInAnyOrder("t1", "t11", "t2", "t22", "t3", 
"taa", "tb", "s2");
+
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(100)},
+                            new String[] {"k", "name"});
+            List<String> pk = Collections.singletonList("k");
+
+            waitForResult(
+                    Arrays.asList("+I[2, shard_2.t1]", "+I[3, x_shard_1.t1]"),
+                    getFileStoreTable("t1"),
+                    rowType,
+                    pk);
+
+            waitForResult(
+                    Collections.singletonList("+I[1, shard_1.t11]"),
+                    getFileStoreTable("t11"),
+                    rowType,
+                    pk);
+
+            waitForResult(
+                    Collections.singletonList("+I[1, shard_1.t2]"),
+                    getFileStoreTable("t2"),
+                    rowType,
+                    pk);
+
+            waitForResult(
+                    Collections.singletonList("+I[2, shard_2.t22]"),
+                    getFileStoreTable("t22"),
+                    rowType,
+                    pk);
+
+            waitForResult(
+                    Arrays.asList("+I[1, shard_1.t3]", "+I[2, shard_2.t3]"),
+                    getFileStoreTable("t3"),
+                    rowType,
+                    pk);
+
+            waitForResult(
+                    Collections.singletonList("+I[1, shard_1.taa]"),
+                    getFileStoreTable("taa"),
+                    rowType,
+                    pk);
+
+            waitForResult(
+                    Collections.singletonList("+I[2, shard_2.tb]"),
+                    getFileStoreTable("tb"),
+                    rowType,
+                    pk);
+
+            waitForResult(
+                    Collections.singletonList("+I[1, shard_1.s2]"),
+                    getFileStoreTable("s2"),
+                    rowType,
+                    pk);
+
+            // test newly added tables
+            if (mode == COMBINED) {
+                // case 1: new tables in existed database
+                statement.executeUpdate("USE shard_2");
+                // ignored: ta
+                statement.executeUpdate(
+                        "CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY 
KEY (k))");
+                statement.executeUpdate("INSERT INTO ta VALUES (10, 
'shard_2.ta')");
+
+                // captured: s3
+                statement.executeUpdate(
+                        "CREATE TABLE s3 (k INT, name VARCHAR(100), PRIMARY 
KEY (k))");
+                statement.executeUpdate("INSERT INTO s3 VALUES (10, 
'shard_2.s3')");
+
+                // case 2: new tables in new captured database
+                statement.executeUpdate("CREATE DATABASE shard_3");
+                statement.executeUpdate("USE shard_3");
+                // ignored: ta, m
+                statement.executeUpdate(
+                        "CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY 
KEY (k))");
+                statement.executeUpdate("INSERT INTO ta VALUES (10, 
'shard_3.ta')");
+
+                statement.executeUpdate(
+                        "CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY 
(k))");
+                statement.executeUpdate("INSERT INTO m VALUES (10, 
'shard_3.m')");
+
+                // captured: tab
+                statement.executeUpdate(
+                        "CREATE TABLE tab (k INT, name VARCHAR(100), PRIMARY 
KEY (k))");
+                statement.executeUpdate("INSERT INTO tab VALUES (10, 
'shard_3.tab')");
+
+                // case 3: new tables in new ignored database
+                statement.executeUpdate("CREATE DATABASE what");
+                statement.executeUpdate("USE what");
+                // ignored: ta
+                statement.executeUpdate(
+                        "CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY 
KEY (k))");
+                statement.executeUpdate("INSERT INTO ta VALUES (10, 
'what.ta')");
+
+                // match including pattern but ignored: s4
+                statement.executeUpdate(
+                        "CREATE TABLE s4 (k INT, name VARCHAR(100), PRIMARY 
KEY (k))");
+                statement.executeUpdate("INSERT INTO s4 VALUES (10, 
'what.s4')");
+
+                Thread.sleep(5_000);
+
+                assertThat(catalog.listTables(database))
+                        .containsExactlyInAnyOrder(
+                                "t1", "t11", "t2", "t22", "t3", "taa", "tb", 
"s2", "s3", "tab");
+
+                waitForResult(
+                        Collections.singletonList("+I[10, shard_2.s3]"),
+                        getFileStoreTable("s3"),
+                        rowType,
+                        pk);
+
+                waitForResult(
+                        Collections.singletonList("+I[10, shard_3.tab]"),
+                        getFileStoreTable("tab"),
+                        rowType,
+                        pk);
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index f593ad275..2ca19d522 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -1036,8 +1034,6 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
     }
 
     private FileStoreTable getFileStoreTable() throws Exception {
-        Catalog catalog = catalog();
-        Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalog.getTable(identifier);
+        return getFileStoreTable(tableName);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index 52e3a4c16..46ac60d9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -346,3 +346,48 @@ CREATE TABLE a (
     v VARCHAR(10),
     PRIMARY KEY (k)
 );
+
+CREATE DATABASE database_shard_1;
+USE database_shard_1;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k BIGINT,
+    v1 DOUBLE,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t3 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE DATABASE database_shard_2;
+USE database_shard_2;
+
+-- test schema merging
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(20),
+    v2 BIGINT,
+    PRIMARY KEY (k)
+);
+
+-- test schema evolution
+CREATE TABLE t2 (
+    k BIGINT,
+    v1 DOUBLE,
+    PRIMARY KEY (k)
+);
+
+-- test some shard doesn't have primary key
+CREATE TABLE t3 (
+    k INT,
+    v1 VARCHAR(10)
+);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
new file mode 100644
index 000000000..1237134f2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
@@ -0,0 +1,94 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant the test user 'paimonuser' all 
privileges:
+--
+GRANT ALL PRIVILEGES ON *.* TO 'paimonuser'@'%';
+
+-- 
################################################################################
+--  MySqlSyncDatabaseTableListITCase
+-- 
################################################################################
+
+-- captured databases
+CREATE DATABASE shard_1;
+CREATE DATABASE shard_2;
+CREATE DATABASE x_shard_1;
+
+-- ignored databases
+CREATE DATABASE ignored;
+
+-- create tables
+
+USE shard_1;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
+INSERT INTO t1 VALUES (1, 'shard_1.t1');
+
+CREATE TABLE t11 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t11 VALUES (1, 'shard_1.t11');
+
+CREATE TABLE t2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t2 VALUES (1, 'shard_1.t2');
+
+CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t3 VALUES (1, 'shard_1.t3');
+
+CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO ta VALUES (1, 'shard_1.ta');
+
+CREATE TABLE taa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO taa VALUES (1, 'shard_1.taa');
+
+CREATE TABLE s1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
+INSERT INTO s1 VALUES (1, 'shard_1.s1');
+
+CREATE TABLE s2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO s2 VALUES (1, 'shard_1.s2');
+
+CREATE TABLE sa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO sa VALUES (1, 'shard_1.sa');
+
+CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO m VALUES (1, 'shard_1.m');
+
+USE shard_2;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t1 VALUES (2, 'shard_2.t1');
+
+CREATE TABLE t2 (k INT, name VARCHAR(100)); -- ignored because of pk absence
+INSERT INTO t2 VALUES (2, 'shard_2.t2');
+
+CREATE TABLE t22 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t22 VALUES (2, 'shard_2.t22');
+
+CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t3 VALUES (2, 'shard_2.t3');
+
+CREATE TABLE tb (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO tb VALUES (2, 'shard_2.tb');
+
+USE x_shard_1;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t1 VALUES (3, 'x_shard_1.t1');
+
+USE ignored;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO t1 VALUES (4, 'ignored.t1');

Reply via email to