This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a988dee59 [flink][mysql-cdc] Refactor MySqlSchema related cods (#1783)
a988dee59 is described below

commit a988dee598b332d292b4083a0e8913b1e94e20aa
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 11 18:37:56 2023 +0800

    [flink][mysql-cdc] Refactor MySqlSchema related cods (#1783)
---
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |  36 +++---
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 121 ++++++++-------------
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  53 +++++----
 .../cdc/mysql/schema/AllMergedMySqlTableInfo.java  |  79 ++++++++++++++
 .../action/cdc/mysql/{ => schema}/MySqlSchema.java |  87 +++++++--------
 .../action/cdc/mysql/schema/MySqlSchemasInfo.java  | 105 ++++++++++++++++++
 .../action/cdc/mysql/schema/MySqlTableInfo.java    |  40 +++++++
 .../mysql/schema/ShardsMergedMySqlTableInfo.java   |  90 +++++++++++++++
 .../cdc/mysql/schema/UnmergedMySqlTableInfo.java   |  63 +++++++++++
 .../action/cdc/mysql/MySqlActionITCaseBase.java    |  24 +++-
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   |  43 ++++----
 .../mysql/MySqlSyncDatabaseTableListITCase.java    |  87 ++++++---------
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  71 ++++--------
 .../test/resources/mysql/sync_database_setup.sql   |  21 +++-
 .../src/test/resources/mysql/sync_table_setup.sql  |  30 ++---
 15 files changed, 641 insertions(+), 309 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index fff0fdaf2..978d2eaa8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -20,11 +20,15 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchema;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Pair;
 
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
 import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
@@ -35,7 +39,6 @@ import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -47,7 +50,6 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -102,14 +104,14 @@ public class MySqlActionUtils {
                 mySqlConfig.get(MySqlSourceOptions.PASSWORD));
     }
 
-    static List<MySqlSchema> getMySqlSchemaList(
+    static MySqlSchemasInfo getMySqlTableInfos(
             Configuration mySqlConfig,
-            Predicate<MySqlSchema> monitorTablePredication,
+            Predicate<String> monitorTablePredication,
             List<Identifier> excludedTables)
             throws Exception {
         Pattern databasePattern =
                 
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
-        List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
+        MySqlSchemasInfo mySqlSchemasInfo = new MySqlSchemasInfo();
         try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
             DatabaseMetaData metaData = conn.getMetaData();
             try (ResultSet schemas = metaData.getCatalogs()) {
@@ -121,15 +123,16 @@ public class MySqlActionUtils {
                             while (tables.next()) {
                                 String tableName = 
tables.getString("TABLE_NAME");
                                 MySqlSchema mySqlSchema =
-                                        new MySqlSchema(
+                                        MySqlSchema.buildSchema(
                                                 metaData,
                                                 databaseName,
                                                 tableName,
                                                 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
-                                if (monitorTablePredication.test(mySqlSchema)) 
{
-                                    mySqlSchemaList.add(mySqlSchema);
+                                Identifier identifier = 
Identifier.create(databaseName, tableName);
+                                if (monitorTablePredication.test(tableName)) {
+                                    mySqlSchemasInfo.addSchema(identifier, 
mySqlSchema);
                                 } else {
-                                    
excludedTables.add(mySqlSchema.identifier());
+                                    excludedTables.add(identifier);
                                 }
                             }
                         }
@@ -137,7 +140,7 @@ public class MySqlActionUtils {
                 }
             }
         }
-        return mySqlSchemaList;
+        return mySqlSchemasInfo;
     }
 
     static void assertSchemaCompatible(TableSchema paimonSchema, Schema 
mySqlSchema) {
@@ -173,7 +176,7 @@ public class MySqlActionUtils {
     }
 
     static Schema buildPaimonSchema(
-            MySqlSchema mySqlSchema,
+            MySqlTableInfo mySqlTableInfo,
             List<String> specifiedPartitionKeys,
             List<String> specifiedPrimaryKeys,
             List<ComputedColumn> computedColumns,
@@ -181,23 +184,24 @@ public class MySqlActionUtils {
             boolean caseSensitive) {
         Schema.Builder builder = Schema.newBuilder();
         builder.options(paimonConfig);
+        MySqlSchema mySqlSchema = mySqlTableInfo.schema();
 
         // build columns and primary keys from mySqlSchema
-        LinkedHashMap<String, Tuple2<DataType, String>> mySqlFields;
+        LinkedHashMap<String, Pair<DataType, String>> mySqlFields;
         List<String> mySqlPrimaryKeys;
         if (caseSensitive) {
             mySqlFields = mySqlSchema.fields();
             mySqlPrimaryKeys = mySqlSchema.primaryKeys();
         } else {
             mySqlFields = new LinkedHashMap<>();
-            for (Map.Entry<String, Tuple2<DataType, String>> entry :
+            for (Map.Entry<String, Pair<DataType, String>> entry :
                     mySqlSchema.fields().entrySet()) {
                 String fieldName = entry.getKey();
                 checkArgument(
                         !mySqlFields.containsKey(fieldName.toLowerCase()),
                         String.format(
                                 "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
-                                fieldName, mySqlSchema.tableName()));
+                                fieldName, mySqlTableInfo.location()));
                 mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
             }
             mySqlPrimaryKeys =
@@ -206,8 +210,8 @@ public class MySqlActionUtils {
                             .collect(Collectors.toList());
         }
 
-        for (Map.Entry<String, Tuple2<DataType, String>> entry : 
mySqlFields.entrySet()) {
-            builder.column(entry.getKey(), entry.getValue().f0, 
entry.getValue().f1);
+        for (Map.Entry<String, Pair<DataType, String>> entry : 
mySqlFields.entrySet()) {
+            builder.column(entry.getKey(), entry.getValue().getLeft(), 
entry.getValue().getRight());
         }
 
         for (ComputedColumn computedColumn : computedColumns) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index a6746904e..3c62b0aa9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -26,6 +26,8 @@ import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
 import org.apache.paimon.schema.Schema;
@@ -46,10 +48,8 @@ import javax.annotation.Nullable;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -111,7 +111,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
     private final String includingTables;
     private final DatabaseSyncMode mode;
 
-    private List<Identifier> monitoredTables;
+    private final List<Identifier> monitoredTables = new ArrayList<>();
+    private final List<Identifier> excludedTables = new ArrayList<>();
 
     public MySqlSyncDatabaseAction(
             Map<String, String> mySqlConfig,
@@ -175,16 +176,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
             validateCaseInsensitive();
         }
 
-        List<Identifier> excludedTables = new ArrayList<>();
-        List<MySqlSchema> beforeMerging =
-                MySqlActionUtils.getMySqlSchemaList(
-                        mySqlConfig, monitorTablePredication(), 
excludedTables);
-        monitoredTables =
-                
beforeMerging.stream().map(MySqlSchema::identifier).collect(Collectors.toList());
-        List<MySqlSchema> mySqlSchemas = mergeShards ? 
mergeShards(beforeMerging) : beforeMerging;
+        MySqlSchemasInfo mySqlSchemasInfo =
+                MySqlActionUtils.getMySqlTableInfos(
+                        mySqlConfig, this::shouldMonitorTable, excludedTables);
+
+        logNonPkTables(mySqlSchemasInfo.nonPkTables());
+        List<MySqlTableInfo> mySqlTableInfos = 
mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
 
         checkArgument(
-                mySqlSchemas.size() > 0,
+                mySqlTableInfos.size() > 0,
                 "No tables found in MySQL database "
                         + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
                         + ", or MySQL database does not exist.");
@@ -194,12 +194,14 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                 new TableNameConverter(caseSensitive, mergeShards, 
tablePrefix, tableSuffix);
 
         List<FileStoreTable> fileStoreTables = new ArrayList<>();
-        for (MySqlSchema mySqlSchema : mySqlSchemas) {
-            Identifier identifier = buildPaimonIdentifier(tableNameConverter, 
mySqlSchema);
+        for (MySqlTableInfo tableInfo : mySqlTableInfos) {
+            Identifier identifier =
+                    Identifier.create(
+                            database, 
tableNameConverter.convert(tableInfo.toPaimonTableName()));
             FileStoreTable table;
             Schema fromMySql =
                     MySqlActionUtils.buildPaimonSchema(
-                            mySqlSchema,
+                            tableInfo,
                             Collections.emptyList(),
                             Collections.emptyList(),
                             Collections.emptyList(),
@@ -208,16 +210,18 @@ public class MySqlSyncDatabaseAction extends ActionBase {
             try {
                 table = (FileStoreTable) catalog.getTable(identifier);
                 Supplier<String> errMsg =
-                        incompatibleMessage(table.schema(), mySqlSchema, 
identifier);
+                        incompatibleMessage(table.schema(), tableInfo, 
identifier);
                 if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
                     fileStoreTables.add(table);
+                    monitoredTables.addAll(tableInfo.identifiers());
                 } else {
-                    unmonitor(mySqlSchema);
+                    excludedTables.addAll(tableInfo.identifiers());
                 }
             } catch (Catalog.TableNotExistException e) {
                 catalog.createTable(identifier, fromMySql, false);
                 table = (FileStoreTable) catalog.getTable(identifier);
                 fileStoreTables.add(table);
+                monitoredTables.addAll(tableInfo.identifiers());
             }
         }
 
@@ -227,7 +231,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         + "MySQL database are not compatible with those of 
existed Paimon tables. Please check the log.");
 
         MySqlSource<String> source =
-                MySqlActionUtils.buildMySqlSource(mySqlConfig, 
buildTableList(excludedTables));
+                MySqlActionUtils.buildMySqlSource(mySqlConfig, 
buildTableList());
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
@@ -286,17 +290,16 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         tableSuffix));
     }
 
-    private Predicate<MySqlSchema> monitorTablePredication() {
-        return schema -> {
-            if (schema.primaryKeys().isEmpty()) {
-                LOG.debug(
-                        "Didn't find primary keys from table '{}'. "
-                                + "This table won't be synchronized.",
-                        schema.identifier());
-                return false;
-            }
-            return shouldMonitorTable(schema.tableName());
-        };
+    private void logNonPkTables(List<Identifier> nonPkTables) {
+        if (!nonPkTables.isEmpty()) {
+            LOG.debug(
+                    "Didn't find primary keys for tables '{}'. "
+                            + "These tables won't be synchronized.",
+                    nonPkTables.stream()
+                            .map(Identifier::getFullName)
+                            .collect(Collectors.joining(",")));
+            excludedTables.addAll(nonPkTables);
+        }
     }
 
     private boolean shouldMonitorTable(String mySqlTableName) {
@@ -325,7 +328,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
     }
 
     private Supplier<String> incompatibleMessage(
-            TableSchema paimonSchema, MySqlSchema mySqlSchema, Identifier 
identifier) {
+            TableSchema paimonSchema, MySqlTableInfo mySqlTableInfo, 
Identifier identifier) {
         return () ->
                 String.format(
                         "Incompatible schema found.\n"
@@ -333,49 +336,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                                 + "MySQL table is: %s, fields are: %s.\n",
                         identifier.getFullName(),
                         paimonSchema.fields(),
-                        mySqlSchema.tableName(),
-                        mySqlSchema.fields());
-    }
-
-    /** Merge schemas for tables that have the same table name. */
-    private List<MySqlSchema> mergeShards(List<MySqlSchema> rawMySqlSchemas) {
-        Map<String, MySqlSchema> schemaMap = new HashMap<>();
-        for (MySqlSchema rawSchema : rawMySqlSchemas) {
-            String tableName = rawSchema.tableName();
-            MySqlSchema schema = schemaMap.get(tableName);
-            if (schema == null) {
-                schemaMap.put(tableName, rawSchema);
-            } else {
-                schemaMap.put(tableName, schema.merge(rawSchema));
-            }
-        }
-        return new ArrayList<>(schemaMap.values());
-    }
-
-    private Identifier buildPaimonIdentifier(
-            TableNameConverter tableNameConverter, MySqlSchema mySqlSchema) {
-        String tableName;
-        if (mergeShards) {
-            tableName = tableNameConverter.convert(mySqlSchema.tableName());
-        } else {
-            // the Paimon table name should be compound of origin database 
name and table name
-            // together to avoid name conflict
-            tableName = tableNameConverter.convert(mySqlSchema.identifier());
-        }
-
-        return Identifier.create(database, tableName);
-    }
-
-    private void unmonitor(MySqlSchema mySqlSchema) {
-        if (mergeShards) {
-            // if schema has been merged, all shards with the same table name 
should be removed
-            monitoredTables =
-                    monitoredTables.stream()
-                            .filter(id -> 
!id.getObjectName().equals(mySqlSchema.tableName()))
-                            .collect(Collectors.toList());
-        } else {
-            monitoredTables.remove(mySqlSchema.identifier());
-        }
+                        mySqlTableInfo.location(),
+                        mySqlTableInfo.schema().fields());
     }
 
     @VisibleForTesting
@@ -383,11 +345,16 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         return monitoredTables;
     }
 
+    @VisibleForTesting
+    public List<Identifier> excludedTables() {
+        return excludedTables;
+    }
+
     /**
      * See {@link 
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils#discoverCapturedTables}
      * and {@code MySqlSyncDatabaseTableListITCase}.
      */
-    private String buildTableList(List<Identifier> excludedTables) {
+    private String buildTableList() {
         String separatorRex = "\\.";
         if (mode == DIVIDED) {
             // In DIVIDED mode, we only concern about existed tables
@@ -395,23 +362,23 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                     .map(t -> t.getDatabaseName() + separatorRex + 
t.getObjectName())
                     .collect(Collectors.joining("|"));
         } else if (mode == COMBINED) {
-            // In COMBINED mode, we should consider both existed tables and 
possible newly added
+            // In COMBINED mode, we should consider both existed tables and 
possible newly created
             // tables, so we should use regular expression to monitor all 
valid tables and exclude
             // certain invalid tables
 
             // The table list is built by template:
-            // 
(?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
+            // 
(?!(^db\\.tbl$)|(^...$))((databasePattern)\\.(including_pattern1|...))
 
             // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables 
whose qualified name
             // is exactly equal to 'db.tbl'
-            // The including pattern 
databasePattern\\.(including_pattern1|...) can include tables
+            // The including pattern 
(databasePattern)\\.(including_pattern1|...) can include tables
             // whose qualified name matches one of the patterns
 
             // a table can be monitored only when its name meets the including 
pattern and doesn't
             // be excluded by excluding pattern at the same time
             String includingPattern =
                     String.format(
-                            "%s%s(%s)",
+                            "(%s)%s(%s)",
                             mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
                             separatorRex,
                             includingTables);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index c60f4fa2e..ca26288d5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -24,6 +24,8 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.schema.Schema;
@@ -143,34 +145,21 @@ public class MySqlSyncTableAction extends ActionBase {
             validateCaseInsensitive();
         }
 
-        List<MySqlSchema> mySqlSchemaList =
-                MySqlActionUtils.getMySqlSchemaList(
+        MySqlSchemasInfo mySqlSchemasInfo =
+                MySqlActionUtils.getMySqlTableInfos(
                         mySqlConfig, monitorTablePredication(), new 
ArrayList<>());
-
-        String tableList =
-                mySqlSchemaList.stream()
-                        .map(m -> m.identifier().getDatabaseName() + "." + 
m.tableName())
-                        .collect(Collectors.joining("|"));
-
-        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
-
-        MySqlSchema mySqlSchema =
-                mySqlSchemaList.stream()
-                        .reduce(MySqlSchema::merge)
-                        .orElseThrow(
-                                () ->
-                                        new RuntimeException(
-                                                "No table satisfies the given 
database name and table name"));
+        validateMySqlTableInfos(mySqlSchemasInfo);
 
         catalog.createDatabase(database, true);
 
+        MySqlTableInfo tableInfo = mySqlSchemasInfo.mergeAll();
         Identifier identifier = new Identifier(database, table);
         FileStoreTable table;
         List<ComputedColumn> computedColumns =
-                buildComputedColumns(computedColumnArgs, 
mySqlSchema.typeMapping());
+                buildComputedColumns(computedColumnArgs, 
tableInfo.schema().typeMapping());
         Schema fromMySql =
                 MySqlActionUtils.buildPaimonSchema(
-                        mySqlSchema,
+                        tableInfo,
                         partitionKeys,
                         primaryKeys,
                         computedColumns,
@@ -196,6 +185,12 @@ public class MySqlSyncTableAction extends ActionBase {
             table = (FileStoreTable) catalog.getTable(identifier);
         }
 
+        String tableList =
+                mySqlSchemasInfo.pkTables().stream()
+                        .map(i -> i.getDatabaseName() + "\\." + 
i.getObjectName())
+                        .collect(Collectors.joining("|"));
+        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
+
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
         Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
@@ -247,11 +242,25 @@ public class MySqlSyncTableAction extends ActionBase {
         }
     }
 
-    private Predicate<MySqlSchema> monitorTablePredication() {
-        return schema -> {
+    private void validateMySqlTableInfos(MySqlSchemasInfo mySqlSchemasInfo) {
+        List<Identifier> nonPkTables = mySqlSchemasInfo.nonPkTables();
+        checkArgument(
+                nonPkTables.isEmpty(),
+                "Source tables of MySQL table synchronization job cannot 
contain table "
+                        + "which doesn't have primary keys.\n"
+                        + "They are: %s",
+                
nonPkTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
+
+        checkArgument(
+                !mySqlSchemasInfo.pkTables().isEmpty(),
+                "No table satisfies the given database name and table name.");
+    }
+
+    private Predicate<String> monitorTablePredication() {
+        return tableName -> {
             Pattern tableNamePattern =
                     
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
-            return tableNamePattern.matcher(schema.tableName()).matches();
+            return tableNamePattern.matcher(tableName).matches();
         };
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/AllMergedMySqlTableInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/AllMergedMySqlTableInfo.java
new file mode 100644
index 000000000..86ec73499
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/AllMergedMySqlTableInfo.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Describe a table whose schema is merged from all source tables. */
+public class AllMergedMySqlTableInfo implements MySqlTableInfo {
+
+    private final List<Identifier> fromTables;
+    private MySqlSchema schema;
+
+    public AllMergedMySqlTableInfo() {
+        this.fromTables = new ArrayList<>();
+    }
+
+    public void init(Identifier identifier, MySqlSchema schema) {
+        this.fromTables.add(identifier);
+        this.schema = schema;
+    }
+
+    public AllMergedMySqlTableInfo merge(Identifier otherTableId, MySqlSchema 
other) {
+        schema = schema.merge(location(), otherTableId.getFullName(), other);
+        fromTables.add(otherTableId);
+        return this;
+    }
+
+    @Override
+    public String location() {
+        return String.format(
+                "{%s}",
+                
fromTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
+    }
+
+    @Override
+    public List<Identifier> identifiers() {
+        throw new UnsupportedOperationException(
+                "AllMergedRichMySqlSchema doesn't support converting to 
identifiers.");
+    }
+
+    @Override
+    public String tableName() {
+        throw new UnsupportedOperationException(
+                "AllMergedRichMySqlSchema doesn't support getting table 
name.");
+    }
+
+    @Override
+    public String toPaimonTableName() {
+        throw new UnsupportedOperationException(
+                "AllMergedRichMySqlSchema doesn't support converting to Paimon 
table name.");
+    }
+
+    @Override
+    public MySqlSchema schema() {
+        return checkNotNull(schema, "MySqlSchema hasn't been set.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
similarity index 62%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 6626fd762..3e4964aff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -16,44 +16,42 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.action.cdc.mysql.schema;
 
-import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.types.DataType;
-
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.paimon.utils.Pair;
 
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.apache.paimon.utils.Preconditions.checkState;
+import java.util.stream.Collectors;
 
 /** Utility class to load MySQL table schema with JDBC. */
 public class MySqlSchema {
 
-    private final String databaseName;
-    private final String tableName;
-    private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
+    private final LinkedHashMap<String, Pair<DataType, String>> fields;
     private final List<String> primaryKeys;
 
-    private boolean hasMerged = false;
+    private MySqlSchema(
+            LinkedHashMap<String, Pair<DataType, String>> fields, List<String> 
primaryKeys) {
+        this.fields = fields;
+        this.primaryKeys = primaryKeys;
+    }
 
-    public MySqlSchema(
+    public static MySqlSchema buildSchema(
             DatabaseMetaData metaData,
             String databaseName,
             String tableName,
             boolean convertTinyintToBool)
-            throws Exception {
-        this.databaseName = databaseName;
-        this.tableName = tableName;
-
-        fields = new LinkedHashMap<>();
+            throws SQLException {
+        LinkedHashMap<String, Pair<DataType, String>> fields = new 
LinkedHashMap<>();
         try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, 
null)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
@@ -70,77 +68,76 @@ public class MySqlSchema {
                 }
                 fields.put(
                         fieldName,
-                        Tuple2.of(
+                        Pair.of(
                                 MySqlTypeUtils.toDataType(
                                         fieldType, precision, scale, 
convertTinyintToBool),
                                 fieldComment));
             }
         }
 
-        primaryKeys = new ArrayList<>();
+        List<String> primaryKeys = new ArrayList<>();
         try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, 
tableName)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 primaryKeys.add(fieldName);
             }
         }
-    }
 
-    public String tableName() {
-        return tableName;
+        return new MySqlSchema(fields, primaryKeys);
     }
 
-    public Identifier identifier() {
-        checkState(!hasMerged, "Cannot get table identifier from merged 
schema.");
-        return Identifier.create(databaseName, tableName);
+    public LinkedHashMap<String, Pair<DataType, String>> fields() {
+        return fields;
     }
 
-    public LinkedHashMap<String, Tuple2<DataType, String>> fields() {
-        return fields;
+    public List<String> primaryKeys() {
+        return primaryKeys;
     }
 
     public Map<String, DataType> typeMapping() {
         Map<String, DataType> typeMapping = new HashMap<>();
-        fields.forEach((name, tuple) -> typeMapping.put(name, tuple.f0));
+        fields.forEach((name, pair) -> typeMapping.put(name, pair.getLeft()));
         return typeMapping;
     }
 
-    public List<String> primaryKeys() {
-        return primaryKeys;
-    }
-
-    public MySqlSchema merge(MySqlSchema other) {
-        hasMerged = true;
-        for (Map.Entry<String, Tuple2<DataType, String>> entry : 
other.fields.entrySet()) {
+    public MySqlSchema merge(String currentTable, String otherTable, 
MySqlSchema other) {
+        for (Map.Entry<String, Pair<DataType, String>> entry : 
other.fields.entrySet()) {
             String fieldName = entry.getKey();
-            DataType newType = entry.getValue().f0;
+            DataType newType = entry.getValue().getLeft();
             if (fields.containsKey(fieldName)) {
-                DataType oldType = fields.get(fieldName).f0;
+                DataType oldType = fields.get(fieldName).getLeft();
                 switch (UpdatedDataFieldsProcessFunction.canConvert(oldType, 
newType)) {
                     case CONVERT:
-                        fields.put(fieldName, Tuple2.of(newType, 
entry.getValue().f1));
+                        fields.put(fieldName, Pair.of(newType, 
entry.getValue().getRight()));
                         break;
                     case EXCEPTION:
                         throw new IllegalArgumentException(
                                 String.format(
-                                        "Column %s have different types in 
table %s.%s and table %s.%s",
+                                        "Column %s have different types when 
merging schemas.\n"
+                                                + "Current table '%s' fields: 
%s\n"
+                                                + "To be merged table '%s' 
fields: %s",
                                         fieldName,
-                                        databaseName,
-                                        tableName,
-                                        other.databaseName,
-                                        other.tableName));
+                                        currentTable,
+                                        fieldsToString(),
+                                        otherTable,
+                                        other.fieldsToString()));
                 }
             } else {
-                fields.put(fieldName, Tuple2.of(newType, entry.getValue().f1));
+                fields.put(fieldName, Pair.of(newType, 
entry.getValue().getRight()));
             }
         }
+
         if (!primaryKeys.equals(other.primaryKeys)) {
             primaryKeys.clear();
         }
         return this;
     }
 
-    public List<String> getPrimaryKeys() {
-        return primaryKeys;
+    private String fieldsToString() {
+        return "["
+                + fields.entrySet().stream()
+                        .map(e -> String.format("%s %s", e.getKey(), 
e.getValue().getLeft()))
+                        .collect(Collectors.joining(","))
+                + "]";
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemasInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemasInfo.java
new file mode 100644
index 000000000..5ae768804
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemasInfo.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utility class to manage MySQL tables and their schemas. */
+public class MySqlSchemasInfo {
+
+    private final Map<Identifier, MySqlSchema> pkTableSchemas;
+    private final Map<Identifier, MySqlSchema> nonPkTableSchemas;
+
+    public MySqlSchemasInfo() {
+        this.pkTableSchemas = new HashMap<>();
+        this.nonPkTableSchemas = new HashMap<>();
+    }
+
+    public void addSchema(Identifier identifier, MySqlSchema mysqlSchema) {
+        if (mysqlSchema.primaryKeys().isEmpty()) {
+            nonPkTableSchemas.put(identifier, mysqlSchema);
+        } else {
+            pkTableSchemas.put(identifier, mysqlSchema);
+        }
+    }
+
+    public List<Identifier> pkTables() {
+        return new ArrayList<>(pkTableSchemas.keySet());
+    }
+
+    public List<Identifier> nonPkTables() {
+        return new ArrayList<>(nonPkTableSchemas.keySet());
+    }
+
+    // only merge pk tables now
+    public MySqlTableInfo mergeAll() {
+        boolean initialized = false;
+        AllMergedMySqlTableInfo merged = new AllMergedMySqlTableInfo();
+        for (Map.Entry<Identifier, MySqlSchema> entry : 
pkTableSchemas.entrySet()) {
+            Identifier id = entry.getKey();
+            MySqlSchema schema = entry.getValue();
+            if (!initialized) {
+                merged.init(id, schema);
+                initialized = true;
+            } else {
+                merged.merge(id, schema);
+            }
+        }
+        return merged;
+    }
+
+    // only handle pk tables now
+    public List<MySqlTableInfo> toMySqlTableInfos(boolean mergeShards) {
+        if (mergeShards) {
+            return mergeShards();
+        } else {
+            return pkTableSchemas.entrySet().stream()
+                    .map(e -> new UnmergedMySqlTableInfo(e.getKey(), 
e.getValue()))
+                    .collect(Collectors.toList());
+        }
+    }
+
+    // only merge pk tables now
+    /** Merge schemas for tables that have the same table name. */
+    private List<MySqlTableInfo> mergeShards() {
+        Map<String, ShardsMergedMySqlTableInfo> nameSchemaMap = new 
HashMap<>();
+        for (Map.Entry<Identifier, MySqlSchema> entry : 
pkTableSchemas.entrySet()) {
+            Identifier id = entry.getKey();
+            String tableName = id.getObjectName();
+
+            MySqlSchema toBeMerged = entry.getValue();
+            ShardsMergedMySqlTableInfo current = nameSchemaMap.get(tableName);
+            if (current == null) {
+                current = new ShardsMergedMySqlTableInfo();
+                current.init(id, toBeMerged);
+                nameSchemaMap.put(tableName, current);
+            } else {
+                nameSchemaMap.put(tableName, current.merge(id, toBeMerged));
+            }
+        }
+
+        return new ArrayList<>(nameSchemaMap.values());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlTableInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlTableInfo.java
new file mode 100644
index 000000000..df9a5541f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlTableInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.List;
+
+/** Describe a MySQL table. */
+public interface MySqlTableInfo {
+
+    /** To indicate where is the table from. */
+    String location();
+
+    /** Return all MySQL table identifiers that build this schema. */
+    List<Identifier> identifiers();
+
+    String tableName();
+
+    /** Convert to corresponding Paimon table name. */
+    String toPaimonTableName();
+
+    MySqlSchema schema();
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/ShardsMergedMySqlTableInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/ShardsMergedMySqlTableInfo.java
new file mode 100644
index 000000000..ea43fbf2f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/ShardsMergedMySqlTableInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * Describe a table whose schema is merged from shards (tables with the same 
name but in different
+ * database).
+ */
+public class ShardsMergedMySqlTableInfo implements MySqlTableInfo {
+
+    private final List<String> fromDatabases;
+
+    private String tableName;
+
+    private MySqlSchema schema;
+
+    public ShardsMergedMySqlTableInfo() {
+        this.fromDatabases = new ArrayList<>();
+    }
+
+    public void init(Identifier identifier, MySqlSchema schema) {
+        this.fromDatabases.add(identifier.getDatabaseName());
+        this.tableName = identifier.getObjectName();
+        this.schema = schema;
+    }
+
+    public ShardsMergedMySqlTableInfo merge(Identifier otherTableId, 
MySqlSchema other) {
+        checkArgument(
+                otherTableId.getObjectName().equals(tableName),
+                "Table to be merged '%s' should equals to current table name 
'%s'.",
+                otherTableId.getObjectName(),
+                tableName);
+
+        schema = schema.merge(location(), otherTableId.getFullName(), other);
+        fromDatabases.add(otherTableId.getDatabaseName());
+        return this;
+    }
+
+    @Override
+    public String location() {
+        return String.format("[%s].%s", String.join(",", fromDatabases), 
tableName);
+    }
+
+    @Override
+    public List<Identifier> identifiers() {
+        return fromDatabases.stream()
+                .map(databaseName -> Identifier.create(databaseName, 
tableName))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public String tableName() {
+        return tableName;
+    }
+
+    @Override
+    public String toPaimonTableName() {
+        return tableName;
+    }
+
+    @Override
+    public MySqlSchema schema() {
+        return checkNotNull(schema, "MySqlSchema hasn't been set.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/UnmergedMySqlTableInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/UnmergedMySqlTableInfo.java
new file mode 100644
index 000000000..ed44c708e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/UnmergedMySqlTableInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Describe a table that is not merged. */
+public class UnmergedMySqlTableInfo implements MySqlTableInfo {
+
+    private final Identifier identifier;
+    private final MySqlSchema schema;
+
+    public UnmergedMySqlTableInfo(Identifier identifier, MySqlSchema schema) {
+        this.identifier = identifier;
+        this.schema = schema;
+    }
+
+    @Override
+    public String location() {
+        return identifier.getFullName();
+    }
+
+    @Override
+    public List<Identifier> identifiers() {
+        return Collections.singletonList(identifier);
+    }
+
+    @Override
+    public String tableName() {
+        return identifier.getObjectName();
+    }
+
+    @Override
+    public String toPaimonTableName() {
+        // the Paimon table name should be compound of origin database name 
and table name
+        // together to avoid name conflict
+        return identifier.getDatabaseName() + "_" + identifier.getObjectName();
+    }
+
+    @Override
+    public MySqlSchema schema() {
+        return schema;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index 9d23ebc6e..00babf813 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.table.FileStoreTable;
@@ -40,6 +41,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -50,6 +52,7 @@ import java.util.stream.Stream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base test class for {@link org.apache.paimon.flink.action.Action}s related 
to MySQL. */
+@SuppressWarnings("BusyWait")
 public class MySqlActionITCaseBase extends ActionITCaseBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlActionITCaseBase.class);
@@ -166,6 +169,25 @@ public class MySqlActionITCaseBase extends 
ActionITCaseBase {
 
     protected FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalog().getTable(identifier);
+        try (Catalog catalog = catalog()) {
+            return (FileStoreTable) catalog.getTable(identifier);
+        }
+    }
+
+    protected void waitingTables(String... tables) throws Exception {
+        waitingTables(Arrays.asList(tables));
+    }
+
+    protected void waitingTables(List<String> tables) throws Exception {
+        LOG.info("Waiting for tables '{}'", tables);
+        try (Catalog catalog = catalog()) {
+            while (true) {
+                List<String> actualTables = catalog.listTables(database);
+                if (actualTables.containsAll(tables)) {
+                    break;
+                }
+                Thread.sleep(100);
+            }
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index eeb76d8be..4e2095344 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -1181,10 +1181,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                 thread.start();
             }
 
-            Catalog catalog = catalog();
-            while (!catalog.listTables(database).containsAll(tables)) {
-                Thread.sleep(100);
-            }
+            waitingTables(tables);
 
             RowType newTableRowType =
                     RowType.of(
@@ -1202,7 +1199,13 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     @Timeout(60)
     public void testSyncMultipleShards() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", "database_shard_.*");
+
+        // test table list
+        mySqlConfig.put(
+                "database-name",
+                ThreadLocalRandom.current().nextBoolean()
+                        ? "database_shard_.*"
+                        : "database_shard_1|database_shard_2");
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
@@ -1307,10 +1310,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         "CREATE TABLE database_shard_2.t4 (k INT, v1 
VARCHAR(10), PRIMARY KEY (k))");
                 statement.executeUpdate("INSERT INTO database_shard_2.t4 
VALUES (2, 'db2_2')");
 
-                Catalog catalog = catalog();
-                while (!catalog.tableExists(Identifier.create(database, 
"t4"))) {
-                    Thread.sleep(100);
-                }
+                waitingTables("t4");
 
                 table = getFileStoreTable("t4");
                 rowType =
@@ -1428,13 +1428,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                 statement.executeUpdate(
                         "INSERT INTO without_merging_shard_2.t3 VALUES (2, 
'test')");
 
-                while (!catalog.listTables(database)
-                        .containsAll(
-                                Arrays.asList(
-                                        "without_merging_shard_1_t3",
-                                        "without_merging_shard_2_t3"))) {
-                    Thread.sleep(100);
-                }
+                waitingTables("without_merging_shard_1_t3", 
"without_merging_shard_2_t3");
 
                 table = getFileStoreTable("without_merging_shard_1_t3");
                 rowType =
@@ -1458,7 +1452,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     }
 
     @Test
-    public void testUnminitorTablesWithMergingShards() throws Exception {
+    public void testMonitoredAndExcludedTablesWithMering() throws Exception {
         // create an incompatible table named t2
         Catalog catalog = catalog();
         catalog.createDatabase(database, true);
@@ -1472,7 +1466,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         catalog.createTable(identifier, schema, false);
 
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", "test_unmonitor_table_shard_.*");
+        mySqlConfig.put("database-name", "monitored_and_excluded_shard_.*");
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -1494,8 +1488,17 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
 
         assertThat(action.monitoredTables())
                 .containsOnly(
-                        Identifier.create("test_unmonitor_table_shard_1", 
"t1"),
-                        Identifier.create("test_unmonitor_table_shard_2", 
"t1"));
+                        Identifier.create("monitored_and_excluded_shard_1", 
"t1"),
+                        Identifier.create("monitored_and_excluded_shard_1", 
"t3"),
+                        Identifier.create("monitored_and_excluded_shard_2", 
"t1"));
+
+        assertThat(action.excludedTables())
+                .containsOnly(
+                        // t2 is merged, so all shards will be excluded
+                        Identifier.create("monitored_and_excluded_shard_1", 
"t2"),
+                        Identifier.create("monitored_and_excluded_shard_2", 
"t2"),
+                        // non pk table
+                        Identifier.create("monitored_and_excluded_shard_2", 
"t3"));
     }
 
     private void assertTableExists(List<String> tableNames) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index faf9f2977..2c73dc330 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -18,8 +18,10 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
@@ -30,7 +32,6 @@ import org.junit.jupiter.api.Timeout;
 
 import java.sql.Statement;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -51,7 +52,11 @@ public class MySqlSyncDatabaseTableListITCase extends 
MySqlActionITCaseBase {
     @Timeout(120)
     public void testActionRunResult() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", ".*shard_.*");
+        mySqlConfig.put(
+                "database-name",
+                ThreadLocalRandom.current().nextBoolean()
+                        ? ".*shard_.*"
+                        : "shard_1|shard_2|shard_3|x_shard_1");
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
@@ -78,24 +83,33 @@ public class MySqlSyncDatabaseTableListITCase extends 
MySqlActionITCaseBase {
         JobClient client = env.executeAsync();
         waitJobRunning(client);
 
-        try (Statement statement = getStatement()) {
-            Catalog catalog = catalog();
-            List<String> tables = waitingAllTables(catalog, 10, 10);
-            assertThat(tables)
-                    .containsExactlyInAnyOrder(
-                            "shard_1_t11",
-                            "shard_1_t2",
-                            "shard_1_t3",
-                            "shard_1_taa",
-                            "shard_1_s2",
-                            "shard_2_t1",
-                            "shard_2_t22",
-                            "shard_2_t3",
-                            "shard_2_tb",
-                            "x_shard_1_t1");
-
-            // test newly added tables
-            if (mode == COMBINED) {
+        assertThat(catalog().listTables(database))
+                .containsExactlyInAnyOrder(
+                        "shard_1_t11",
+                        "shard_1_t2",
+                        "shard_1_t3",
+                        "shard_1_taa",
+                        "shard_1_s2",
+                        "shard_2_t1",
+                        "shard_2_t22",
+                        "shard_2_t3",
+                        "shard_2_tb",
+                        "x_shard_1_t1");
+
+        // test newly created tables
+        if (mode == COMBINED) {
+            try (Statement statement = getStatement()) {
+                // ensure the job steps into incremental phase
+                statement.executeUpdate("USE shard_1");
+                statement.executeUpdate("INSERT INTO t2 VALUES (1, 'A')");
+                waitForResult(
+                        Collections.singletonList("+I[1, A]"),
+                        getFileStoreTable("shard_1_t2"),
+                        RowType.of(
+                                new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(100)},
+                                new String[] {"k", "name"}),
+                        Collections.singletonList("k"));
+
                 // case 1: new tables in existed database
                 statement.executeUpdate("USE shard_2");
                 // ignored: ta
@@ -127,37 +141,8 @@ public class MySqlSyncDatabaseTableListITCase extends 
MySqlActionITCaseBase {
                 statement.executeUpdate(
                         "CREATE TABLE s4 (k INT, name VARCHAR(100), PRIMARY 
KEY (k))");
 
-                tables = waitingAllTables(catalog, 12, 10);
-
-                assertThat(tables)
-                        .containsExactlyInAnyOrder(
-                                // old
-                                "shard_1_t11",
-                                "shard_1_t2",
-                                "shard_1_t3",
-                                "shard_1_taa",
-                                "shard_1_s2",
-                                "shard_2_t1",
-                                "shard_2_t22",
-                                "shard_2_t3",
-                                "shard_2_tb",
-                                "x_shard_1_t1",
-                                // new
-                                "shard_2_s3",
-                                "shard_3_tab");
+                waitingTables("shard_2_s3", "shard_3_tab");
             }
         }
     }
-
-    private List<String> waitingAllTables(Catalog catalog, int numberOfTables, 
int maxAttempt)
-            throws Exception {
-        List<String> tables;
-        int attempt = 0;
-        do {
-            Thread.sleep(5_000);
-            tables = catalog.listTables(database);
-        } while (tables.size() < numberOfTables && ++attempt < maxAttempt);
-
-        return tables;
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9f22898a5..1195f3dac 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -42,6 +43,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -648,13 +650,12 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         Collections.emptyMap());
 
         assertThatThrownBy(() -> action.build(env))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage(
-                        "Column v1 have different types in table "
-                                + DATABASE_NAME
-                                + ".incompatible_field_1 and table "
-                                + DATABASE_NAME
-                                + ".incompatible_field_2");
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Column v1 have different types when merging 
schemas.\n"
+                                        + "Current table 
'{paimon_sync_table.incompatible_field_2}' fields: [_id INT,v1 INT]\n"
+                                        + "To be merged table 
'paimon_sync_table.incompatible_field_1' fields: [_id INT,v1 TIMESTAMP(0)]"));
     }
 
     @Test
@@ -983,10 +984,16 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
     }
 
     @Test
+    @Timeout(60)
     public void testSyncShards() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", "shard_.+");
-        mySqlConfig.put("table-name", "t.+");
+
+        // test table list
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        String dbPattern = random.nextBoolean() ? "shard_.+" : 
"shard_1|shard_2";
+        String tblPattern = random.nextBoolean() ? "t.+" : "t1|t2";
+        mySqlConfig.put("database-name", dbPattern);
+        mySqlConfig.put("table-name", tblPattern);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
@@ -1010,9 +1017,11 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
 
         try (Statement statement = getStatement()) {
             statement.execute("USE shard_1");
-            statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30'), 
(2, '2023-07-30')");
+            statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30')");
+            statement.executeUpdate("INSERT INTO t2 VALUES (2, '2023-07-30')");
             statement.execute("USE shard_2");
-            statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31'), 
(4, '2023-07-31')");
+            statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31')");
+            statement.executeUpdate("INSERT INTO t1 VALUES (4, '2023-07-31')");
         }
 
         FileStoreTable table = getFileStoreTable();
@@ -1035,46 +1044,6 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                 Arrays.asList("pk", "pt"));
     }
 
-    @Test
-    public void testSyncMultipleTable() throws Exception {
-        Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", "paimon_multiple_table");
-        mySqlConfig.put("table-name", "t1|t2");
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.emptyList(),
-                        Collections.singletonList("id"),
-                        Collections.emptyList(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-
-        FileStoreTable table = getFileStoreTable();
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
-                        },
-                        new String[] {"id", "name"});
-        waitForResult(
-                Arrays.asList("+I[1, flink]", "+I[2, paimon]"),
-                table,
-                rowType,
-                Collections.singletonList("id"));
-    }
-
     private FileStoreTable getFileStoreTable() throws Exception {
         return getFileStoreTable(tableName);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index 59c720c06..b4343f1f2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -432,11 +432,11 @@ CREATE TABLE t2 (
 );
 
 -- 
################################################################################
---  testUnmonitorTablesWithMergingShards
+--  testMonitoredAndExcludedTablesWithMering
 -- 
################################################################################
 
-CREATE DATABASE test_unmonitor_table_shard_1;
-USE test_unmonitor_table_shard_1;
+CREATE DATABASE monitored_and_excluded_shard_1;
+USE monitored_and_excluded_shard_1;
 
 CREATE TABLE t1 (
     k INT,
@@ -450,9 +450,15 @@ CREATE TABLE t2 (
     PRIMARY KEY (k)
 );
 
+CREATE TABLE t3 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
 
-CREATE DATABASE test_unmonitor_table_shard_2;
-USE test_unmonitor_table_shard_2;
+
+CREATE DATABASE monitored_and_excluded_shard_2;
+USE monitored_and_excluded_shard_2;
 
 CREATE TABLE t1 (
     k INT,
@@ -466,3 +472,8 @@ CREATE TABLE t2 (
     PRIMARY KEY (k)
 );
 
+CREATE TABLE t3 (
+    k INT,
+    v2 VARCHAR(10)
+);
+
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 8971afc1d..e10df0afa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -308,35 +308,23 @@ CREATE TABLE t1 (
     PRIMARY KEY (pk)
 );
 
-CREATE DATABASE shard_2;
-USE shard_2;
-
-CREATE TABLE t1 (
+CREATE TABLE t2 (
     pk INT,
     _date VARCHAR(10),
     PRIMARY KEY (pk)
 );
 
-
--- 
################################################################################
---  testSyncMultipleTable
--- 
################################################################################
-
-CREATE DATABASE paimon_multiple_table;
-USE paimon_multiple_table;
+CREATE DATABASE shard_2;
+USE shard_2;
 
 CREATE TABLE t1 (
-    id INT,
-    name VARCHAR(10),
-    PRIMARY KEY (id)
+    pk INT,
+    _date VARCHAR(10),
+    PRIMARY KEY (pk)
 );
 
-INSERT INTO t1 VALUES (1, 'flink');
-
 CREATE TABLE t2 (
-    id INT,
-    name VARCHAR(10),
-    PRIMARY KEY (id)
+    pk INT,
+    _date VARCHAR(10),
+    PRIMARY KEY (pk)
 );
-
-INSERT INTO t2 VALUES (2, 'paimon');
\ No newline at end of file

Reply via email to