This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d0e214234 [refactor] Refactor MySQL CDC actions to make it easier to
introduce new features (#1850)
d0e214234 is described below
commit d0e21423432c085a7a4a9a5e04726a643a449556
Author: yuzelin <[email protected]>
AuthorDate: Sat Aug 19 10:03:09 2023 +0800
[refactor] Refactor MySQL CDC actions to make it easier to introduce new
features (#1850)
---
.../apache/paimon/flink/action/ActionFactory.java | 5 +
.../paimon/flink/action/cdc/DatabaseSyncMode.java | 19 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 118 ++++----
.../cdc/mysql/MySqlSyncDatabaseActionFactory.java | 68 ++---
.../action/cdc/mysql/MySqlSyncTableAction.java | 72 ++---
.../cdc/mysql/MySqlSyncTableActionFactory.java | 47 +--
...MultiTableUpdatedDataFieldsProcessFunction.java | 5 -
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 6 +-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 5 +-
.../action/cdc/mysql/MySqlActionITCaseBase.java | 25 ++
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 319 ++++-----------------
.../mysql/MySqlSyncDatabaseTableListITCase.java | 33 +--
.../cdc/mysql/MySqlSyncTableActionITCase.java | 186 +++---------
.../test/resources/mysql/sync_database_setup.sql | 1 -
14 files changed, 304 insertions(+), 605 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index 76089bd96..edd63895b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -142,6 +142,11 @@ public interface ActionFactory extends Factory {
params.has(key), "Argument '%s' is required. Run '<action>
--help' for help.", key);
}
+ default String getRequiredValue(MultipleParameterTool params, String key) {
+ checkRequiredArgument(params, key);
+ return params.get(key);
+ }
+
static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
Map<String, String> kvs = new HashMap<>();
for (String kvString : keyValues.split(",")) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
index 8e4641ccd..594ab06a4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action.cdc;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
/**
@@ -31,5 +33,20 @@ import java.io.Serializable;
*/
public enum DatabaseSyncMode implements Serializable {
DIVIDED,
- COMBINED
+ COMBINED;
+
+ public static DatabaseSyncMode fromString(@Nullable String mode) {
+ if (mode == null) {
+ return DIVIDED;
+ }
+
+ switch (mode.toLowerCase()) {
+ case "divided":
+ return DIVIDED;
+ case "combined":
+ return COMBINED;
+ default:
+ throw new UnsupportedOperationException("Unsupported sink
mode: " + mode);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 17bcd9173..2858c78ad 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -48,6 +48,7 @@ import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -99,68 +100,81 @@ public class MySqlSyncDatabaseAction extends ActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
- private final Configuration mySqlConfig;
private final String database;
- private final boolean ignoreIncompatible;
- private final boolean mergeShards;
- private final String tablePrefix;
- private final String tableSuffix;
- private final Pattern includingPattern;
- @Nullable private final Pattern excludingPattern;
- private final Map<String, String> tableConfig;
- private final String includingTables;
- private final DatabaseSyncMode mode;
+ private final Configuration mySqlConfig;
+ private Map<String, String> tableConfig = new HashMap<>();
+ private boolean ignoreIncompatible = false;
+ private boolean mergeShards = true;
+ private String tablePrefix = "";
+ private String tableSuffix = "";
+ private String includingTables = ".*";
+ @Nullable String excludingTables;
+ private DatabaseSyncMode mode = DIVIDED;
+
+ // for test purpose
private final List<Identifier> monitoredTables = new ArrayList<>();
private final List<Identifier> excludedTables = new ArrayList<>();
public MySqlSyncDatabaseAction(
- Map<String, String> mySqlConfig,
- String warehouse,
- String database,
- boolean ignoreIncompatible,
- Map<String, String> catalogConfig,
- Map<String, String> tableConfig) {
- this(
- mySqlConfig,
- warehouse,
- database,
- ignoreIncompatible,
- true,
- null,
- null,
- null,
- null,
- catalogConfig,
- tableConfig,
- DIVIDED);
+ String warehouse, String database, Map<String, String>
mySqlConfig) {
+ this(warehouse, database, Collections.emptyMap(), mySqlConfig);
}
public MySqlSyncDatabaseAction(
- Map<String, String> mySqlConfig,
String warehouse,
String database,
- boolean ignoreIncompatible,
- boolean mergeShards,
- @Nullable String tablePrefix,
- @Nullable String tableSuffix,
- @Nullable String includingTables,
- @Nullable String excludingTables,
Map<String, String> catalogConfig,
- Map<String, String> tableConfig,
- DatabaseSyncMode mode) {
+ Map<String, String> mySqlConfig) {
super(warehouse, catalogConfig);
- this.mySqlConfig = Configuration.fromMap(mySqlConfig);
this.database = database;
+ this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+ }
+
+ public MySqlSyncDatabaseAction withTableConfig(Map<String, String>
tableConfig) {
+ this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public MySqlSyncDatabaseAction ignoreIncompatible(boolean
ignoreIncompatible) {
this.ignoreIncompatible = ignoreIncompatible;
+ return this;
+ }
+
+ public MySqlSyncDatabaseAction mergeShards(boolean mergeShards) {
this.mergeShards = mergeShards;
- this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
- this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
- this.includingTables = includingTables == null ? ".*" :
includingTables;
- this.includingPattern = Pattern.compile(this.includingTables);
- this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
- this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public MySqlSyncDatabaseAction withTablePrefix(@Nullable String
tablePrefix) {
+ if (tablePrefix != null) {
+ this.tablePrefix = tablePrefix;
+ }
+ return this;
+ }
+
+ public MySqlSyncDatabaseAction withTableSuffix(@Nullable String
tableSuffix) {
+ if (tableSuffix != null) {
+ this.tableSuffix = tableSuffix;
+ }
+ return this;
+ }
+
+ public MySqlSyncDatabaseAction includingTables(@Nullable String
includingTables) {
+ if (includingTables != null) {
+ this.includingTables = includingTables;
+ }
+ return this;
+ }
+
+ public MySqlSyncDatabaseAction excludingTables(@Nullable String
excludingTables) {
+ this.excludingTables = excludingTables;
+ return this;
+ }
+
+ public MySqlSyncDatabaseAction withMode(DatabaseSyncMode mode) {
this.mode = mode;
+ return this;
}
public void build(StreamExecutionEnvironment env) throws Exception {
@@ -176,9 +190,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
validateCaseInsensitive();
}
+ Pattern includingPattern = Pattern.compile(includingTables);
+ Pattern excludingPattern =
+ excludingTables == null ? null :
Pattern.compile(excludingTables);
MySqlSchemasInfo mySqlSchemasInfo =
MySqlActionUtils.getMySqlTableInfos(
- mySqlConfig, this::shouldMonitorTable, excludedTables);
+ mySqlConfig,
+ tableName ->
+ shouldMonitorTable(tableName,
includingPattern, excludingPattern),
+ excludedTables);
logNonPkTables(mySqlSchemasInfo.nonPkTables());
List<MySqlTableInfo> mySqlTableInfos =
mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
@@ -238,8 +258,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
MySqlTableSchemaBuilder schemaBuilder =
new MySqlTableSchemaBuilder(tableConfig, caseSensitive,
convertTinyint1ToBool);
- Pattern includingPattern = this.includingPattern;
- Pattern excludingPattern = this.excludingPattern;
+
EventParser.Factory<String> parserFactory =
() ->
new MySqlDebeziumJsonEventParser(
@@ -302,7 +321,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
}
}
- private boolean shouldMonitorTable(String mySqlTableName) {
+ private boolean shouldMonitorTable(
+ String mySqlTableName, Pattern includingPattern, @Nullable Pattern
excludingPattern) {
boolean shouldMonitor =
includingPattern.matcher(mySqlTableName).matches();
if (excludingPattern != null) {
shouldMonitor = shouldMonitor &&
!excludingPattern.matcher(mySqlTableName).matches();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index dd9938772..c320a3203 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -24,12 +24,8 @@ import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.flink.api.java.utils.MultipleParameterTool;
-import java.util.Map;
import java.util.Optional;
-import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
-import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
-
/** Factory to create {@link MySqlSyncDatabaseAction}. */
public class MySqlSyncDatabaseActionFactory implements ActionFactory {
@@ -42,54 +38,28 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
@Override
public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "warehouse");
- checkRequiredArgument(params, "database");
checkRequiredArgument(params, "mysql-conf");
- String warehouse = params.get("warehouse");
- String database = params.get("database");
- boolean ignoreIncompatible =
Boolean.parseBoolean(params.get("ignore-incompatible"));
- boolean mergeShards =
- !params.has("merge-shards") ||
Boolean.parseBoolean(params.get("merge-shards"));
- String tablePrefix = params.get("table-prefix");
- String tableSuffix = params.get("table-suffix");
- String includingTables = params.get("including-tables");
- String excludingTables = params.get("excluding-tables");
- String mode = params.get("mode");
- DatabaseSyncMode syncMode;
- if (mode == null) {
- syncMode = DIVIDED;
- } else {
- switch (mode.toLowerCase()) {
- case "divided":
- syncMode = DIVIDED;
- break;
- case "combined":
- syncMode = COMBINED;
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported mode '" + mode + "' for database
synchronization mode.");
- }
- }
-
- Map<String, String> mySqlConfig = optionalConfigMap(params,
"mysql-conf");
- Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
- Map<String, String> tableConfig = optionalConfigMap(params,
"table-conf");
- return Optional.of(
+ MySqlSyncDatabaseAction mySqlSyncDatabaseAction =
new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- ignoreIncompatible,
- mergeShards,
- tablePrefix,
- tableSuffix,
- includingTables,
- excludingTables,
- catalogConfig,
- tableConfig,
- syncMode));
+ getRequiredValue(params, "warehouse"),
+ getRequiredValue(params, "database"),
+ optionalConfigMap(params, "catalog-conf"),
+ optionalConfigMap(params, "mysql-conf"));
+
+ mySqlSyncDatabaseAction
+ .withTableConfig(optionalConfigMap(params, "table-conf"))
+
.ignoreIncompatible(Boolean.parseBoolean(params.get("ignore-incompatible")))
+ .mergeShards(
+ !params.has("merge-shards")
+ ||
Boolean.parseBoolean(params.get("merge-shards")))
+ .withTablePrefix(params.get("table-suffix"))
+ .withTableSuffix(params.get("table-suffix"))
+ .includingTables(params.get("including-tables"))
+ .excludingTables(params.get("excluding-tables"))
+ .withMode(DatabaseSyncMode.fromString(params.get("mode")));
+
+ return Optional.of(mySqlSyncDatabaseAction);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index ca26288d5..4d3b0957c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -39,7 +39,9 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -84,53 +86,59 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
public class MySqlSyncTableAction extends ActionBase {
- private final Configuration mySqlConfig;
private final String database;
private final String table;
- private final List<String> partitionKeys;
- private final List<String> primaryKeys;
- private final List<String> computedColumnArgs;
- private final Map<String, String> tableConfig;
+ private final Configuration mySqlConfig;
+
+ private final List<String> partitionKeys = new ArrayList<>();
+ private final List<String> primaryKeys = new ArrayList<>();
+
+ private Map<String, String> tableConfig = new HashMap<>();
+ private List<String> computedColumnArgs = new ArrayList<>();
public MySqlSyncTableAction(
- Map<String, String> mySqlConfig,
- String warehouse,
- String database,
- String table,
- List<String> partitionKeys,
- List<String> primaryKeys,
- Map<String, String> catalogConfig,
- Map<String, String> tableConfig) {
- this(
- mySqlConfig,
- warehouse,
- database,
- table,
- partitionKeys,
- primaryKeys,
- Collections.emptyList(),
- catalogConfig,
- tableConfig);
+ String warehouse, String database, String table, Map<String,
String> mySqlConfig) {
+ this(warehouse, database, table, Collections.emptyMap(), mySqlConfig);
}
public MySqlSyncTableAction(
- Map<String, String> mySqlConfig,
String warehouse,
String database,
String table,
- List<String> partitionKeys,
- List<String> primaryKeys,
- List<String> computedColumnArgs,
Map<String, String> catalogConfig,
- Map<String, String> tableConfig) {
+ Map<String, String> mySqlConfig) {
super(warehouse, catalogConfig);
- this.mySqlConfig = Configuration.fromMap(mySqlConfig);
this.database = database;
this.table = table;
- this.partitionKeys = partitionKeys;
- this.primaryKeys = primaryKeys;
- this.computedColumnArgs = computedColumnArgs;
+ this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+ }
+
+ public MySqlSyncTableAction withPartitionKeys(String... partitionKeys) {
+ return withPartitionKeys(Arrays.asList(partitionKeys));
+ }
+
+ public MySqlSyncTableAction withPartitionKeys(List<String> partitionKeys) {
+ this.partitionKeys.addAll(partitionKeys);
+ return this;
+ }
+
+ public MySqlSyncTableAction withPrimaryKeys(String... primaryKeys) {
+ return withPrimaryKeys(Arrays.asList(primaryKeys));
+ }
+
+ public MySqlSyncTableAction withPrimaryKeys(List<String> primaryKeys) {
+ this.primaryKeys.addAll(primaryKeys);
+ return this;
+ }
+
+ public MySqlSyncTableAction withTableConfig(Map<String, String>
tableConfig) {
this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public MySqlSyncTableAction withComputedColumnArgs(List<String>
computedColumnArgs) {
+ this.computedColumnArgs = computedColumnArgs;
+ return this;
}
public void build(StreamExecutionEnvironment env) throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
index e2e17c153..98a824e87 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
@@ -25,12 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/** Factory to create {@link MySqlSyncTableAction}. */
public class MySqlSyncTableActionFactory implements ActionFactory {
@@ -45,43 +40,31 @@ public class MySqlSyncTableActionFactory implements
ActionFactory {
@Override
public Optional<Action> create(MultipleParameterTool params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
+ checkRequiredArgument(params, "mysql-conf");
+
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ tablePath.f0,
+ tablePath.f1,
+ tablePath.f2,
+ optionalConfigMap(params, "catalog-conf"),
+ optionalConfigMap(params, "mysql-conf"))
+ .withTableConfig(optionalConfigMap(params,
"table-conf"));
- List<String> partitionKeys = Collections.emptyList();
if (params.has("partition-keys")) {
- partitionKeys =
- Arrays.stream(params.get("partition-keys").split(","))
- .collect(Collectors.toList());
+ action.withPartitionKeys(params.get("partition-keys").split(","));
}
- List<String> primaryKeys = Collections.emptyList();
if (params.has("primary-keys")) {
- primaryKeys =
- Arrays.stream(params.get("primary-keys").split(","))
- .collect(Collectors.toList());
+ action.withPrimaryKeys(params.get("primary-keys").split(","));
}
- List<String> computedColumnArgs = Collections.emptyList();
if (params.has("computed-column")) {
- computedColumnArgs = new
ArrayList<>(params.getMultiParameter("computed-column"));
+ action.withComputedColumnArgs(
+ new
ArrayList<>(params.getMultiParameter("computed-column")));
}
- checkRequiredArgument(params, "mysql-conf");
-
- Map<String, String> mySqlConfig = optionalConfigMap(params,
"mysql-conf");
- Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
- Map<String, String> tableConfig = optionalConfigMap(params,
"table-conf");
-
- return Optional.of(
- new MySqlSyncTableAction(
- mySqlConfig,
- tablePath.f0,
- tablePath.f1,
- tablePath.f2,
- partitionKeys,
- primaryKeys,
- computedColumnArgs,
- catalogConfig,
- tableConfig));
+ return Optional.of(action);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index f736a2066..0ad412e47 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -84,9 +84,4 @@ public class MultiTableUpdatedDataFieldsProcessFunction
}
}
}
-
- private List<SchemaChange> extractSchemaChanges(
- SchemaManager schemaManager, List<DataField> updatedDataFields) {
- return getSchemaChanges(updatedDataFields, schemaManager);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 32ed4d382..4a33eb1b7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -54,12 +54,8 @@ public class UpdatedDataFieldsProcessFunction
public void processElement(
List<DataField> updatedDataFields, Context context,
Collector<Void> collector)
throws Exception {
- for (SchemaChange schemaChange :
extractSchemaChanges(updatedDataFields)) {
+ for (SchemaChange schemaChange : extractSchemaChanges(schemaManager,
updatedDataFields)) {
applySchemaChange(schemaManager, schemaChange, identifier);
}
}
-
- private List<SchemaChange> extractSchemaChanges(List<DataField>
updatedDataFields) {
- return getSchemaChanges(updatedDataFields, schemaManager);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index b38808cff..7684ddbbc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -164,8 +164,9 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
return ConvertAction.EXCEPTION;
}
- protected List<SchemaChange> getSchemaChanges(
- List<DataField> updatedDataFields, SchemaManager schemaManager) {
+ protected List<SchemaChange> extractSchemaChanges(
+ SchemaManager schemaManager, List<DataField> updatedDataFields) {
+
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
for (DataField oldField : oldRowType.getFields()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index 00babf813..3f580951c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -28,7 +28,9 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -157,6 +159,29 @@ public class MySqlActionITCaseBase extends
ActionITCaseBase {
return config;
}
+ protected JobClient runActionWithDefaultEnv(MySqlSyncTableAction action)
throws Exception {
+ StreamExecutionEnvironment env = getBasicEnv();
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+ return client;
+ }
+
+ protected void runActionWithDefaultEnv(MySqlSyncDatabaseAction action)
throws Exception {
+ StreamExecutionEnvironment env = getBasicEnv();
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+ }
+
+ protected StreamExecutionEnvironment getBasicEnv() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ return env;
+ }
+
protected void waitJobRunning(JobClient client) throws Exception {
while (true) {
JobStatus status = client.getJobStatus().get();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 4e2095344..d3e5ea6d9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -30,7 +30,6 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
@@ -80,23 +79,10 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "paimon_sync_database");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- Collections.emptyMap(),
- tableConfig);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig());
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
testSchemaEvolutionImpl(statement);
@@ -223,23 +209,10 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name",
"paimon_sync_database_tinyint_schema");
mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- Collections.emptyMap(),
- tableConfig);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig());
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
testSchemaEvolutionImplWithTinyInt1Convert(statement);
@@ -315,13 +288,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- Collections.emptyMap(),
- Collections.emptyMap());
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig);
assertThatThrownBy(() -> action.build(env))
.isInstanceOf(IllegalArgumentException.class)
@@ -338,13 +305,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- Collections.emptyMap(),
- Collections.emptyMap());
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig);
assertThatThrownBy(() -> action.build(env))
.isInstanceOf(IllegalArgumentException.class)
@@ -371,23 +332,11 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name",
"paimon_sync_database_ignore_incompatible");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- true,
- Collections.emptyMap(),
- tableConfig);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .ignoreIncompatible(true);
+ runActionWithDefaultEnv(action);
// validate `compatible` can be synchronized
try (Statement statement = getStatement()) {
@@ -431,29 +380,12 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "paimon_sync_database_affix");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- true,
- "test_prefix_",
- "_test_suffix",
- null,
- null,
- Collections.emptyMap(),
- tableConfig,
- DIVIDED);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withTablePrefix("test_prefix_")
+ .withTableSuffix("_test_suffix");
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
testTableAffixImpl(statement);
@@ -602,29 +534,12 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", databaseName);
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- true,
- null,
- null,
- includingTables,
- excludingTables,
- Collections.emptyMap(),
- tableConfig,
- DIVIDED);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .includingTables(includingTables)
+ .excludingTables(excludingTables);
+ runActionWithDefaultEnv(action);
// check paimon tables
assertTableExists(existedTables);
@@ -637,22 +552,15 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "paimon_ignore_CASE");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
-
- Map<String, String> catalogConfig =
- Collections.singletonMap(CatalogOptions.METASTORE.key(),
"test-case-insensitive");
-
MySqlSyncDatabaseAction action =
new MySqlSyncDatabaseAction(
- mySqlConfig, warehouse, database, false,
catalogConfig, tableConfig);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ warehouse,
+ database,
+ Collections.singletonMap(
+ CatalogOptions.METASTORE.key(),
"test-case-insensitive"),
+ mySqlConfig)
+ .withTableConfig(getBasicTableConfig());
+ runActionWithDefaultEnv(action);
// check table schema
FileStoreTable table = getFileStoreTable("t");
@@ -740,30 +648,14 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", mySqlDatabase);
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- true,
- null,
- null,
- "t.+",
- ".*a$",
- Collections.emptyMap(),
- tableConfig,
- COMBINED);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .includingTables("t.+")
+ .excludingTables(".*a$")
+ .withMode(COMBINED);
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
FileStoreTable table1 = getFileStoreTable("t1");
@@ -1035,12 +927,6 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", databaseName);
mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
Map<String, String> catalogConfig =
testSchemaChange
@@ -1049,19 +935,11 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
: Collections.emptyMap();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- true,
- null,
- null,
- "t.+",
- null,
- catalogConfig,
- tableConfig,
- COMBINED);
+ new MySqlSyncDatabaseAction(warehouse, database,
catalogConfig, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .includingTables("t.+")
+ .withMode(COMBINED);
+ StreamExecutionEnvironment env = getBasicEnv();
action.build(env);
if (Objects.nonNull(savepointPath)) {
@@ -1081,23 +959,10 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", "paimon_sync_database_tinyint");
mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- Collections.emptyMap(),
- tableConfig);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig());
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
testTinyInt1Convert(statement);
@@ -1140,33 +1005,16 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", databaseName);
mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
Map<String, String> tableConfig = getBasicTableConfig();
tableConfig.put("sink.parallelism", "1");
tableConfig.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "4 mb");
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- true,
- null,
- null,
- null,
- null,
- Collections.emptyMap(),
- tableConfig,
- COMBINED);
- action.build(env);
-
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(tableConfig)
+ .withMode(COMBINED);
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
statement.executeUpdate("USE " + databaseName);
@@ -1207,30 +1055,12 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
? "database_shard_.*"
: "database_shard_1|database_shard_2");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ?
DIVIDED : COMBINED;
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- true,
- null,
- null,
- null,
- null,
- Collections.emptyMap(),
- tableConfig,
- mode);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withMode(mode);
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
// test insert into t1
@@ -1332,30 +1162,13 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "without_merging_shard_.*");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ?
DIVIDED : COMBINED;
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- false,
- null,
- null,
- null,
- null,
- Collections.emptyMap(),
- tableConfig,
- mode);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .mergeShards(false)
+ .withMode(mode);
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
Thread.sleep(5_000);
@@ -1468,23 +1281,11 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "monitored_and_excluded_shard_.*");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- true,
- true,
- null,
- null,
- null,
- null,
- Collections.emptyMap(),
- Collections.emptyMap(),
- COMBINED);
- action.build(env);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .ignoreIncompatible(true)
+ .withMode(COMBINED);
+ action.build(StreamExecutionEnvironment.getExecutionEnvironment());
assertThat(action.monitoredTables())
.containsOnly(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index 2c73dc330..b791dd84b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -23,9 +23,6 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -58,30 +55,16 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
? ".*shard_.*"
: "shard_1|shard_2|shard_3|x_shard_1");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ?
DIVIDED : COMBINED;
MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- false,
- false,
- null,
- null,
- "t.+|s.+",
- "ta|sa",
- Collections.emptyMap(),
- tableConfig,
- mode);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .mergeShards(false)
+ .withMode(mode)
+ .includingTables("t.+|s.+")
+ .excludingTables("ta|sa");
+
+ runActionWithDefaultEnv(action);
assertThat(catalog().listTables(database))
.containsExactlyInAnyOrder(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index f0b28982e..6100288d8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -29,7 +29,6 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.BeforeAll;
@@ -67,29 +66,23 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "schema_evolution_\\d+");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncTableAction action =
new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.singletonList("pt"),
- Arrays.asList("pt", "_id"),
- Collections.singletonMap(
- CatalogOptions.METASTORE.key(),
"test-alter-table"),
- tableConfig);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ warehouse,
+ database,
+ tableName,
+ Collections.singletonMap(
+ CatalogOptions.METASTORE.key(),
"test-alter-table"),
+ mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withPartitionKeys("pt")
+ .withPrimaryKeys("pt", "_id");
+ runActionWithDefaultEnv(action);
checkTableSchema(
- "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT
NOT
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+ "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},"
+ + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"_id\"},"
+ +
"{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
try (Statement statement = getStatement()) {
testSchemaEvolutionImpl(statement);
@@ -254,27 +247,15 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "schema_evolution_multiple");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.emptyList(),
- Collections.singletonList("_id"),
- Collections.emptyMap(),
- Collections.emptyMap());
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig);
+ runActionWithDefaultEnv(action);
checkTableSchema(
- "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
+ "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},"
+ +
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},"
+ +
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},"
+ +
"{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
try (Statement statement = getStatement()) {
testSchemaEvolutionMultipleImpl(statement);
@@ -346,24 +327,11 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "all_types_table");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.singletonList("pt"),
- Arrays.asList("pt", "_id"),
- Collections.emptyMap(),
- Collections.emptyMap());
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig)
+ .withPartitionKeys("pt")
+ .withPrimaryKeys("pt", "_id");
+ JobClient client = runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
testAllTypesImpl(statement);
@@ -645,15 +613,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.emptyList(),
- Collections.singletonList("_id"),
- Collections.emptyMap(),
- Collections.emptyMap());
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig);
assertThatThrownBy(() -> action.build(env))
.satisfies(
@@ -680,15 +640,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.emptyList(),
- Collections.singletonList("a"),
- Collections.emptyMap(),
- Collections.emptyMap());
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig)
+ .withPrimaryKeys("a");
assertThatThrownBy(() -> action.build(env))
.isInstanceOf(IllegalArgumentException.class)
@@ -703,15 +656,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.emptyList(),
- Collections.singletonList("pk"),
- Collections.emptyMap(),
- Collections.emptyMap());
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig)
+ .withPrimaryKeys("pk");
assertThatThrownBy(() -> action.build(env))
.isInstanceOf(IllegalArgumentException.class)
@@ -727,15 +673,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyMap(),
- Collections.emptyMap());
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig);
assertThatThrownBy(() -> action.build(env))
.isInstanceOf(IllegalArgumentException.class)
@@ -760,11 +698,6 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_computed_column");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
List<String> computedColumnDefs =
Arrays.asList(
"_year_date=year(_date)",
@@ -787,19 +720,11 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_truncate_date=truncate(pk,2)");
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.singletonList("_year_date"),
- Arrays.asList("pk", "_year_date"),
- computedColumnDefs,
- Collections.emptyMap(),
- Collections.emptyMap());
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig)
+ .withPartitionKeys("_year_date")
+ .withPrimaryKeys("pk", "_year_date")
+ .withComputedColumnArgs(computedColumnDefs);
+ runActionWithDefaultEnv(action);
if (executeMysql) {
try (Statement statement = getStatement()) {
@@ -877,25 +802,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("table-name", "test_tinyint1_convert");
mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.emptyList(),
- Collections.singletonList("pk"),
- Collections.emptyList(),
- Collections.emptyMap(),
- Collections.emptyMap());
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig);
+ runActionWithDefaultEnv(action);
checkTableSchema(
"[{\"id\":0,\"name\":\"pk\",\"type\":\"INT NOT
NULL\",\"description\":\"\"},"
@@ -949,25 +858,12 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", dbPattern);
mySqlConfig.put("table-name", tblPattern);
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.singletonList("pt"),
- Arrays.asList("pk", "pt"),
- Collections.singletonList("pt=substring(_date,5)"),
- Collections.emptyMap(),
- Collections.emptyMap());
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig)
+ .withPartitionKeys("pt")
+ .withPrimaryKeys("pk", "pt")
+
.withComputedColumnArgs(Collections.singletonList("pt=substring(_date,5)"));
+ runActionWithDefaultEnv(action);
try (Statement statement = getStatement()) {
statement.execute("USE shard_1");
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index b4343f1f2..7f91e9df6 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -476,4 +476,3 @@ CREATE TABLE t3 (
k INT,
v2 VARCHAR(10)
);
-