This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e214234 [refactor] Refactor MySQL CDC actions to make it easier to 
introduce new features (#1850)
d0e214234 is described below

commit d0e21423432c085a7a4a9a5e04726a643a449556
Author: yuzelin <[email protected]>
AuthorDate: Sat Aug 19 10:03:09 2023 +0800

    [refactor] Refactor MySQL CDC actions to make it easier to introduce new 
features (#1850)
---
 .../apache/paimon/flink/action/ActionFactory.java  |   5 +
 .../paimon/flink/action/cdc/DatabaseSyncMode.java  |  19 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 118 ++++----
 .../cdc/mysql/MySqlSyncDatabaseActionFactory.java  |  68 ++---
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  72 ++---
 .../cdc/mysql/MySqlSyncTableActionFactory.java     |  47 +--
 ...MultiTableUpdatedDataFieldsProcessFunction.java |   5 -
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |   6 +-
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |   5 +-
 .../action/cdc/mysql/MySqlActionITCaseBase.java    |  25 ++
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 319 ++++-----------------
 .../mysql/MySqlSyncDatabaseTableListITCase.java    |  33 +--
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 186 +++---------
 .../test/resources/mysql/sync_database_setup.sql   |   1 -
 14 files changed, 304 insertions(+), 605 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index 76089bd96..edd63895b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -142,6 +142,11 @@ public interface ActionFactory extends Factory {
                 params.has(key), "Argument '%s' is required. Run '<action> 
--help' for help.", key);
     }
 
+    default String getRequiredValue(MultipleParameterTool params, String key) {
+        checkRequiredArgument(params, key);
+        return params.get(key);
+    }
+
     static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
         Map<String, String> kvs = new HashMap<>();
         for (String kvString : keyValues.split(",")) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
index 8e4641ccd..594ab06a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /**
@@ -31,5 +33,20 @@ import java.io.Serializable;
  */
 public enum DatabaseSyncMode implements Serializable {
     DIVIDED,
-    COMBINED
+    COMBINED;
+
+    public static DatabaseSyncMode fromString(@Nullable String mode) {
+        if (mode == null) {
+            return DIVIDED;
+        }
+
+        switch (mode.toLowerCase()) {
+            case "divided":
+                return DIVIDED;
+            case "combined":
+                return COMBINED;
+            default:
+                throw new UnsupportedOperationException("Unsupported sink 
mode: " + mode);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 17bcd9173..2858c78ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -48,6 +48,7 @@ import javax.annotation.Nullable;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -99,68 +100,81 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
 
-    private final Configuration mySqlConfig;
     private final String database;
-    private final boolean ignoreIncompatible;
-    private final boolean mergeShards;
-    private final String tablePrefix;
-    private final String tableSuffix;
-    private final Pattern includingPattern;
-    @Nullable private final Pattern excludingPattern;
-    private final Map<String, String> tableConfig;
-    private final String includingTables;
-    private final DatabaseSyncMode mode;
+    private final Configuration mySqlConfig;
 
+    private Map<String, String> tableConfig = new HashMap<>();
+    private boolean ignoreIncompatible = false;
+    private boolean mergeShards = true;
+    private String tablePrefix = "";
+    private String tableSuffix = "";
+    private String includingTables = ".*";
+    @Nullable String excludingTables;
+    private DatabaseSyncMode mode = DIVIDED;
+
+    // for test purpose
     private final List<Identifier> monitoredTables = new ArrayList<>();
     private final List<Identifier> excludedTables = new ArrayList<>();
 
     public MySqlSyncDatabaseAction(
-            Map<String, String> mySqlConfig,
-            String warehouse,
-            String database,
-            boolean ignoreIncompatible,
-            Map<String, String> catalogConfig,
-            Map<String, String> tableConfig) {
-        this(
-                mySqlConfig,
-                warehouse,
-                database,
-                ignoreIncompatible,
-                true,
-                null,
-                null,
-                null,
-                null,
-                catalogConfig,
-                tableConfig,
-                DIVIDED);
+            String warehouse, String database, Map<String, String> 
mySqlConfig) {
+        this(warehouse, database, Collections.emptyMap(), mySqlConfig);
     }
 
     public MySqlSyncDatabaseAction(
-            Map<String, String> mySqlConfig,
             String warehouse,
             String database,
-            boolean ignoreIncompatible,
-            boolean mergeShards,
-            @Nullable String tablePrefix,
-            @Nullable String tableSuffix,
-            @Nullable String includingTables,
-            @Nullable String excludingTables,
             Map<String, String> catalogConfig,
-            Map<String, String> tableConfig,
-            DatabaseSyncMode mode) {
+            Map<String, String> mySqlConfig) {
         super(warehouse, catalogConfig);
-        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
         this.database = database;
+        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+    }
+
+    public MySqlSyncDatabaseAction withTableConfig(Map<String, String> 
tableConfig) {
+        this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public MySqlSyncDatabaseAction ignoreIncompatible(boolean 
ignoreIncompatible) {
         this.ignoreIncompatible = ignoreIncompatible;
+        return this;
+    }
+
+    public MySqlSyncDatabaseAction mergeShards(boolean mergeShards) {
         this.mergeShards = mergeShards;
-        this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
-        this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
-        this.includingTables = includingTables == null ? ".*" : 
includingTables;
-        this.includingPattern = Pattern.compile(this.includingTables);
-        this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
-        this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public MySqlSyncDatabaseAction withTablePrefix(@Nullable String 
tablePrefix) {
+        if (tablePrefix != null) {
+            this.tablePrefix = tablePrefix;
+        }
+        return this;
+    }
+
+    public MySqlSyncDatabaseAction withTableSuffix(@Nullable String 
tableSuffix) {
+        if (tableSuffix != null) {
+            this.tableSuffix = tableSuffix;
+        }
+        return this;
+    }
+
+    public MySqlSyncDatabaseAction includingTables(@Nullable String 
includingTables) {
+        if (includingTables != null) {
+            this.includingTables = includingTables;
+        }
+        return this;
+    }
+
+    public MySqlSyncDatabaseAction excludingTables(@Nullable String 
excludingTables) {
+        this.excludingTables = excludingTables;
+        return this;
+    }
+
+    public MySqlSyncDatabaseAction withMode(DatabaseSyncMode mode) {
         this.mode = mode;
+        return this;
     }
 
     public void build(StreamExecutionEnvironment env) throws Exception {
@@ -176,9 +190,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
             validateCaseInsensitive();
         }
 
+        Pattern includingPattern = Pattern.compile(includingTables);
+        Pattern excludingPattern =
+                excludingTables == null ? null : 
Pattern.compile(excludingTables);
         MySqlSchemasInfo mySqlSchemasInfo =
                 MySqlActionUtils.getMySqlTableInfos(
-                        mySqlConfig, this::shouldMonitorTable, excludedTables);
+                        mySqlConfig,
+                        tableName ->
+                                shouldMonitorTable(tableName, 
includingPattern, excludingPattern),
+                        excludedTables);
 
         logNonPkTables(mySqlSchemasInfo.nonPkTables());
         List<MySqlTableInfo> mySqlTableInfos = 
mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
@@ -238,8 +258,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
         MySqlTableSchemaBuilder schemaBuilder =
                 new MySqlTableSchemaBuilder(tableConfig, caseSensitive, 
convertTinyint1ToBool);
-        Pattern includingPattern = this.includingPattern;
-        Pattern excludingPattern = this.excludingPattern;
+
         EventParser.Factory<String> parserFactory =
                 () ->
                         new MySqlDebeziumJsonEventParser(
@@ -302,7 +321,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         }
     }
 
-    private boolean shouldMonitorTable(String mySqlTableName) {
+    private boolean shouldMonitorTable(
+            String mySqlTableName, Pattern includingPattern, @Nullable Pattern 
excludingPattern) {
         boolean shouldMonitor = 
includingPattern.matcher(mySqlTableName).matches();
         if (excludingPattern != null) {
             shouldMonitor = shouldMonitor && 
!excludingPattern.matcher(mySqlTableName).matches();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index dd9938772..c320a3203 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -24,12 +24,8 @@ import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
-import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
-import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
-
 /** Factory to create {@link MySqlSyncDatabaseAction}. */
 public class MySqlSyncDatabaseActionFactory implements ActionFactory {
 
@@ -42,54 +38,28 @@ public class MySqlSyncDatabaseActionFactory implements 
ActionFactory {
 
     @Override
     public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "warehouse");
-        checkRequiredArgument(params, "database");
         checkRequiredArgument(params, "mysql-conf");
 
-        String warehouse = params.get("warehouse");
-        String database = params.get("database");
-        boolean ignoreIncompatible = 
Boolean.parseBoolean(params.get("ignore-incompatible"));
-        boolean mergeShards =
-                !params.has("merge-shards") || 
Boolean.parseBoolean(params.get("merge-shards"));
-        String tablePrefix = params.get("table-prefix");
-        String tableSuffix = params.get("table-suffix");
-        String includingTables = params.get("including-tables");
-        String excludingTables = params.get("excluding-tables");
-        String mode = params.get("mode");
-        DatabaseSyncMode syncMode;
-        if (mode == null) {
-            syncMode = DIVIDED;
-        } else {
-            switch (mode.toLowerCase()) {
-                case "divided":
-                    syncMode = DIVIDED;
-                    break;
-                case "combined":
-                    syncMode = COMBINED;
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unsupported mode '" + mode + "' for database 
synchronization mode.");
-            }
-        }
-
-        Map<String, String> mySqlConfig = optionalConfigMap(params, 
"mysql-conf");
-        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
-        Map<String, String> tableConfig = optionalConfigMap(params, 
"table-conf");
-        return Optional.of(
+        MySqlSyncDatabaseAction mySqlSyncDatabaseAction =
                 new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        ignoreIncompatible,
-                        mergeShards,
-                        tablePrefix,
-                        tableSuffix,
-                        includingTables,
-                        excludingTables,
-                        catalogConfig,
-                        tableConfig,
-                        syncMode));
+                        getRequiredValue(params, "warehouse"),
+                        getRequiredValue(params, "database"),
+                        optionalConfigMap(params, "catalog-conf"),
+                        optionalConfigMap(params, "mysql-conf"));
+
+        mySqlSyncDatabaseAction
+                .withTableConfig(optionalConfigMap(params, "table-conf"))
+                
.ignoreIncompatible(Boolean.parseBoolean(params.get("ignore-incompatible")))
+                .mergeShards(
+                        !params.has("merge-shards")
+                                || 
Boolean.parseBoolean(params.get("merge-shards")))
+                .withTablePrefix(params.get("table-suffix"))
+                .withTableSuffix(params.get("table-suffix"))
+                .includingTables(params.get("including-tables"))
+                .excludingTables(params.get("excluding-tables"))
+                .withMode(DatabaseSyncMode.fromString(params.get("mode")));
+
+        return Optional.of(mySqlSyncDatabaseAction);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index ca26288d5..4d3b0957c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -39,7 +39,9 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -84,53 +86,59 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  */
 public class MySqlSyncTableAction extends ActionBase {
 
-    private final Configuration mySqlConfig;
     private final String database;
     private final String table;
-    private final List<String> partitionKeys;
-    private final List<String> primaryKeys;
-    private final List<String> computedColumnArgs;
-    private final Map<String, String> tableConfig;
+    private final Configuration mySqlConfig;
+
+    private final List<String> partitionKeys = new ArrayList<>();
+    private final List<String> primaryKeys = new ArrayList<>();
+
+    private Map<String, String> tableConfig = new HashMap<>();
+    private List<String> computedColumnArgs = new ArrayList<>();
 
     public MySqlSyncTableAction(
-            Map<String, String> mySqlConfig,
-            String warehouse,
-            String database,
-            String table,
-            List<String> partitionKeys,
-            List<String> primaryKeys,
-            Map<String, String> catalogConfig,
-            Map<String, String> tableConfig) {
-        this(
-                mySqlConfig,
-                warehouse,
-                database,
-                table,
-                partitionKeys,
-                primaryKeys,
-                Collections.emptyList(),
-                catalogConfig,
-                tableConfig);
+            String warehouse, String database, String table, Map<String, 
String> mySqlConfig) {
+        this(warehouse, database, table, Collections.emptyMap(), mySqlConfig);
     }
 
     public MySqlSyncTableAction(
-            Map<String, String> mySqlConfig,
             String warehouse,
             String database,
             String table,
-            List<String> partitionKeys,
-            List<String> primaryKeys,
-            List<String> computedColumnArgs,
             Map<String, String> catalogConfig,
-            Map<String, String> tableConfig) {
+            Map<String, String> mySqlConfig) {
         super(warehouse, catalogConfig);
-        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
         this.database = database;
         this.table = table;
-        this.partitionKeys = partitionKeys;
-        this.primaryKeys = primaryKeys;
-        this.computedColumnArgs = computedColumnArgs;
+        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+    }
+
+    public MySqlSyncTableAction withPartitionKeys(String... partitionKeys) {
+        return withPartitionKeys(Arrays.asList(partitionKeys));
+    }
+
+    public MySqlSyncTableAction withPartitionKeys(List<String> partitionKeys) {
+        this.partitionKeys.addAll(partitionKeys);
+        return this;
+    }
+
+    public MySqlSyncTableAction withPrimaryKeys(String... primaryKeys) {
+        return withPrimaryKeys(Arrays.asList(primaryKeys));
+    }
+
+    public MySqlSyncTableAction withPrimaryKeys(List<String> primaryKeys) {
+        this.primaryKeys.addAll(primaryKeys);
+        return this;
+    }
+
+    public MySqlSyncTableAction withTableConfig(Map<String, String> 
tableConfig) {
         this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public MySqlSyncTableAction withComputedColumnArgs(List<String> 
computedColumnArgs) {
+        this.computedColumnArgs = computedColumnArgs;
+        return this;
     }
 
     public void build(StreamExecutionEnvironment env) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
index e2e17c153..98a824e87 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
@@ -25,12 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /** Factory to create {@link MySqlSyncTableAction}. */
 public class MySqlSyncTableActionFactory implements ActionFactory {
@@ -45,43 +40,31 @@ public class MySqlSyncTableActionFactory implements 
ActionFactory {
     @Override
     public Optional<Action> create(MultipleParameterTool params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
+        checkRequiredArgument(params, "mysql-conf");
+
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                                tablePath.f0,
+                                tablePath.f1,
+                                tablePath.f2,
+                                optionalConfigMap(params, "catalog-conf"),
+                                optionalConfigMap(params, "mysql-conf"))
+                        .withTableConfig(optionalConfigMap(params, 
"table-conf"));
 
-        List<String> partitionKeys = Collections.emptyList();
         if (params.has("partition-keys")) {
-            partitionKeys =
-                    Arrays.stream(params.get("partition-keys").split(","))
-                            .collect(Collectors.toList());
+            action.withPartitionKeys(params.get("partition-keys").split(","));
         }
 
-        List<String> primaryKeys = Collections.emptyList();
         if (params.has("primary-keys")) {
-            primaryKeys =
-                    Arrays.stream(params.get("primary-keys").split(","))
-                            .collect(Collectors.toList());
+            action.withPrimaryKeys(params.get("primary-keys").split(","));
         }
 
-        List<String> computedColumnArgs = Collections.emptyList();
         if (params.has("computed-column")) {
-            computedColumnArgs = new 
ArrayList<>(params.getMultiParameter("computed-column"));
+            action.withComputedColumnArgs(
+                    new 
ArrayList<>(params.getMultiParameter("computed-column")));
         }
 
-        checkRequiredArgument(params, "mysql-conf");
-
-        Map<String, String> mySqlConfig = optionalConfigMap(params, 
"mysql-conf");
-        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
-        Map<String, String> tableConfig = optionalConfigMap(params, 
"table-conf");
-
-        return Optional.of(
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        tablePath.f0,
-                        tablePath.f1,
-                        tablePath.f2,
-                        partitionKeys,
-                        primaryKeys,
-                        computedColumnArgs,
-                        catalogConfig,
-                        tableConfig));
+        return Optional.of(action);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index f736a2066..0ad412e47 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -84,9 +84,4 @@ public class MultiTableUpdatedDataFieldsProcessFunction
             }
         }
     }
-
-    private List<SchemaChange> extractSchemaChanges(
-            SchemaManager schemaManager, List<DataField> updatedDataFields) {
-        return getSchemaChanges(updatedDataFields, schemaManager);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 32ed4d382..4a33eb1b7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -54,12 +54,8 @@ public class UpdatedDataFieldsProcessFunction
     public void processElement(
             List<DataField> updatedDataFields, Context context, 
Collector<Void> collector)
             throws Exception {
-        for (SchemaChange schemaChange : 
extractSchemaChanges(updatedDataFields)) {
+        for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
updatedDataFields)) {
             applySchemaChange(schemaManager, schemaChange, identifier);
         }
     }
-
-    private List<SchemaChange> extractSchemaChanges(List<DataField> 
updatedDataFields) {
-        return getSchemaChanges(updatedDataFields, schemaManager);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index b38808cff..7684ddbbc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -164,8 +164,9 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         return ConvertAction.EXCEPTION;
     }
 
-    protected List<SchemaChange> getSchemaChanges(
-            List<DataField> updatedDataFields, SchemaManager schemaManager) {
+    protected List<SchemaChange> extractSchemaChanges(
+            SchemaManager schemaManager, List<DataField> updatedDataFields) {
+
         RowType oldRowType = schemaManager.latest().get().logicalRowType();
         Map<String, DataField> oldFields = new HashMap<>();
         for (DataField oldField : oldRowType.getFields()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index 00babf813..3f580951c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -28,7 +28,9 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.AfterAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,6 +159,29 @@ public class MySqlActionITCaseBase extends 
ActionITCaseBase {
         return config;
     }
 
+    protected JobClient runActionWithDefaultEnv(MySqlSyncTableAction action) 
throws Exception {
+        StreamExecutionEnvironment env = getBasicEnv();
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+        return client;
+    }
+
+    protected void runActionWithDefaultEnv(MySqlSyncDatabaseAction action) 
throws Exception {
+        StreamExecutionEnvironment env = getBasicEnv();
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+    }
+
+    protected StreamExecutionEnvironment getBasicEnv() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        return env;
+    }
+
     protected void waitJobRunning(JobClient client) throws Exception {
         while (true) {
             JobStatus status = client.getJobStatus().get();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 4e2095344..d3e5ea6d9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -30,7 +30,6 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
@@ -80,23 +79,10 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", "paimon_sync_database");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        Collections.emptyMap(),
-                        tableConfig);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig());
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             testSchemaEvolutionImpl(statement);
@@ -223,23 +209,10 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", 
"paimon_sync_database_tinyint_schema");
         mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        Collections.emptyMap(),
-                        tableConfig);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig());
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             testSchemaEvolutionImplWithTinyInt1Convert(statement);
@@ -315,13 +288,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig);
 
         assertThatThrownBy(() -> action.build(env))
                 .isInstanceOf(IllegalArgumentException.class)
@@ -338,13 +305,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig);
 
         assertThatThrownBy(() -> action.build(env))
                 .isInstanceOf(IllegalArgumentException.class)
@@ -371,23 +332,11 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", 
"paimon_sync_database_ignore_incompatible");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        true,
-                        Collections.emptyMap(),
-                        tableConfig);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .ignoreIncompatible(true);
+        runActionWithDefaultEnv(action);
 
         // validate `compatible` can be synchronized
         try (Statement statement = getStatement()) {
@@ -431,29 +380,12 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", "paimon_sync_database_affix");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        true,
-                        "test_prefix_",
-                        "_test_suffix",
-                        null,
-                        null,
-                        Collections.emptyMap(),
-                        tableConfig,
-                        DIVIDED);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withTablePrefix("test_prefix_")
+                        .withTableSuffix("_test_suffix");
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             testTableAffixImpl(statement);
@@ -602,29 +534,12 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", databaseName);
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        true,
-                        null,
-                        null,
-                        includingTables,
-                        excludingTables,
-                        Collections.emptyMap(),
-                        tableConfig,
-                        DIVIDED);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .includingTables(includingTables)
+                        .excludingTables(excludingTables);
+        runActionWithDefaultEnv(action);
 
         // check paimon tables
         assertTableExists(existedTables);
@@ -637,22 +552,15 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", "paimon_ignore_CASE");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
-
-        Map<String, String> catalogConfig =
-                Collections.singletonMap(CatalogOptions.METASTORE.key(), 
"test-case-insensitive");
-
         MySqlSyncDatabaseAction action =
                 new MySqlSyncDatabaseAction(
-                        mySqlConfig, warehouse, database, false, 
catalogConfig, tableConfig);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                                warehouse,
+                                database,
+                                Collections.singletonMap(
+                                        CatalogOptions.METASTORE.key(), 
"test-case-insensitive"),
+                                mySqlConfig)
+                        .withTableConfig(getBasicTableConfig());
+        runActionWithDefaultEnv(action);
 
         // check table schema
         FileStoreTable table = getFileStoreTable("t");
@@ -740,30 +648,14 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
 
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", mySqlDatabase);
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
 
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        true,
-                        null,
-                        null,
-                        "t.+",
-                        ".*a$",
-                        Collections.emptyMap(),
-                        tableConfig,
-                        COMBINED);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .includingTables("t.+")
+                        .excludingTables(".*a$")
+                        .withMode(COMBINED);
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             FileStoreTable table1 = getFileStoreTable("t1");
@@ -1035,12 +927,6 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", databaseName);
         mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
 
         Map<String, String> catalogConfig =
                 testSchemaChange
@@ -1049,19 +935,11 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         : Collections.emptyMap();
 
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        true,
-                        null,
-                        null,
-                        "t.+",
-                        null,
-                        catalogConfig,
-                        tableConfig,
-                        COMBINED);
+                new MySqlSyncDatabaseAction(warehouse, database, 
catalogConfig, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .includingTables("t.+")
+                        .withMode(COMBINED);
+        StreamExecutionEnvironment env = getBasicEnv();
         action.build(env);
 
         if (Objects.nonNull(savepointPath)) {
@@ -1081,23 +959,10 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", "paimon_sync_database_tinyint");
         mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        Collections.emptyMap(),
-                        tableConfig);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig());
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             testTinyInt1Convert(statement);
@@ -1140,33 +1005,16 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", databaseName);
         mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
 
         Map<String, String> tableConfig = getBasicTableConfig();
         tableConfig.put("sink.parallelism", "1");
         tableConfig.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "4 mb");
 
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        true,
-                        null,
-                        null,
-                        null,
-                        null,
-                        Collections.emptyMap(),
-                        tableConfig,
-                        COMBINED);
-        action.build(env);
-
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(tableConfig)
+                        .withMode(COMBINED);
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             statement.executeUpdate("USE " + databaseName);
@@ -1207,30 +1055,12 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         ? "database_shard_.*"
                         : "database_shard_1|database_shard_2");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ? 
DIVIDED : COMBINED;
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        true,
-                        null,
-                        null,
-                        null,
-                        null,
-                        Collections.emptyMap(),
-                        tableConfig,
-                        mode);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withMode(mode);
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             // test insert into t1
@@ -1332,30 +1162,13 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", "without_merging_shard_.*");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ? 
DIVIDED : COMBINED;
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        false,
-                        null,
-                        null,
-                        null,
-                        null,
-                        Collections.emptyMap(),
-                        tableConfig,
-                        mode);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .mergeShards(false)
+                        .withMode(mode);
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             Thread.sleep(5_000);
@@ -1468,23 +1281,11 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", "monitored_and_excluded_shard_.*");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        true,
-                        true,
-                        null,
-                        null,
-                        null,
-                        null,
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
-                        COMBINED);
-        action.build(env);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .ignoreIncompatible(true)
+                        .withMode(COMBINED);
+        action.build(StreamExecutionEnvironment.getExecutionEnvironment());
 
         assertThat(action.monitoredTables())
                 .containsOnly(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index 2c73dc330..b791dd84b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -23,9 +23,6 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -58,30 +55,16 @@ public class MySqlSyncDatabaseTableListITCase extends 
MySqlActionITCaseBase {
                         ? ".*shard_.*"
                         : "shard_1|shard_2|shard_3|x_shard_1");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ? 
DIVIDED : COMBINED;
         MySqlSyncDatabaseAction action =
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        false,
-                        false,
-                        null,
-                        null,
-                        "t.+|s.+",
-                        "ta|sa",
-                        Collections.emptyMap(),
-                        tableConfig,
-                        mode);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .mergeShards(false)
+                        .withMode(mode)
+                        .includingTables("t.+|s.+")
+                        .excludingTables("ta|sa");
+
+        runActionWithDefaultEnv(action);
 
         assertThat(catalog().listTables(database))
                 .containsExactlyInAnyOrder(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index f0b28982e..6100288d8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -29,7 +29,6 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.BeforeAll;
@@ -67,29 +66,23 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "schema_evolution_\\d+");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Map<String, String> tableConfig = getBasicTableConfig();
         MySqlSyncTableAction action =
                 new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.singletonList("pt"),
-                        Arrays.asList("pt", "_id"),
-                        Collections.singletonMap(
-                                CatalogOptions.METASTORE.key(), 
"test-alter-table"),
-                        tableConfig);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                                warehouse,
+                                database,
+                                tableName,
+                                Collections.singletonMap(
+                                        CatalogOptions.METASTORE.key(), 
"test-alter-table"),
+                                mySqlConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withPartitionKeys("pt")
+                        .withPrimaryKeys("pt", "_id");
+        runActionWithDefaultEnv(action);
 
         checkTableSchema(
-                "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT 
NOT 
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+                "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},"
+                        + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"_id\"},"
+                        + 
"{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
 
         try (Statement statement = getStatement()) {
             testSchemaEvolutionImpl(statement);
@@ -254,27 +247,15 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "schema_evolution_multiple");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.emptyList(),
-                        Collections.singletonList("_id"),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig);
+        runActionWithDefaultEnv(action);
 
         checkTableSchema(
-                "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
+                "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},"
+                        + 
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},"
+                        + 
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},"
+                        + 
"{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
 
         try (Statement statement = getStatement()) {
             testSchemaEvolutionMultipleImpl(statement);
@@ -346,24 +327,11 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "all_types_table");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.singletonList("pt"),
-                        Arrays.asList("pt", "_id"),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig)
+                        .withPartitionKeys("pt")
+                        .withPrimaryKeys("pt", "_id");
+        JobClient client = runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             testAllTypesImpl(statement);
@@ -645,15 +613,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.emptyList(),
-                        Collections.singletonList("_id"),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig);
 
         assertThatThrownBy(() -> action.build(env))
                 .satisfies(
@@ -680,15 +640,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.emptyList(),
-                        Collections.singletonList("a"),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig)
+                        .withPrimaryKeys("a");
 
         assertThatThrownBy(() -> action.build(env))
                 .isInstanceOf(IllegalArgumentException.class)
@@ -703,15 +656,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.emptyList(),
-                        Collections.singletonList("pk"),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig)
+                        .withPrimaryKeys("pk");
 
         assertThatThrownBy(() -> action.build(env))
                 .isInstanceOf(IllegalArgumentException.class)
@@ -727,15 +673,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig);
 
         assertThatThrownBy(() -> action.build(env))
                 .isInstanceOf(IllegalArgumentException.class)
@@ -760,11 +698,6 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "test_computed_column");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
         List<String> computedColumnDefs =
                 Arrays.asList(
                         "_year_date=year(_date)",
@@ -787,19 +720,11 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         "_truncate_date=truncate(pk,2)");
 
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.singletonList("_year_date"),
-                        Arrays.asList("pk", "_year_date"),
-                        computedColumnDefs,
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig)
+                        .withPartitionKeys("_year_date")
+                        .withPrimaryKeys("pk", "_year_date")
+                        .withComputedColumnArgs(computedColumnDefs);
+        runActionWithDefaultEnv(action);
 
         if (executeMysql) {
             try (Statement statement = getStatement()) {
@@ -877,25 +802,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("table-name", "test_tinyint1_convert");
         mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.emptyList(),
-                        Collections.singletonList("pk"),
-                        Collections.emptyList(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig);
+        runActionWithDefaultEnv(action);
 
         checkTableSchema(
                 "[{\"id\":0,\"name\":\"pk\",\"type\":\"INT NOT 
NULL\",\"description\":\"\"},"
@@ -949,25 +858,12 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", dbPattern);
         mySqlConfig.put("table-name", tblPattern);
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
         MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.singletonList("pt"),
-                        Arrays.asList("pk", "pt"),
-                        Collections.singletonList("pt=substring(_date,5)"),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
+                new MySqlSyncTableAction(warehouse, database, tableName, 
mySqlConfig)
+                        .withPartitionKeys("pt")
+                        .withPrimaryKeys("pk", "pt")
+                        
.withComputedColumnArgs(Collections.singletonList("pt=substring(_date,5)"));
+        runActionWithDefaultEnv(action);
 
         try (Statement statement = getStatement()) {
             statement.execute("USE shard_1");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index b4343f1f2..7f91e9df6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -476,4 +476,3 @@ CREATE TABLE t3 (
     k INT,
     v2 VARCHAR(10)
 );
-

Reply via email to