This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 3e35a112baad2b8c7217ccf4d0d3af2f01202ec8 Author: yuzelin <[email protected]> AuthorDate: Wed Apr 26 12:25:31 2023 +0800 [flink] Support specifying including or excluding source tables in MySQL CDC action (#1021) --- docs/content/how-to/cdc-ingestion.md | 7 ++ .../action/cdc/mysql/MySqlSyncDatabaseAction.java | 40 ++++++++ .../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 102 +++++++++++++++++++++ .../src/test/resources/mysql/setup.sql | 80 ++++++++++++++++ 4 files changed, 229 insertions(+) diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index 204a7eb8a..636b59da0 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -115,6 +115,8 @@ To use this feature through `flink run`, run the following shell command. [--ignore-incompatible <true/false>] \ [--table-prefix <paimon-table-prefix>] \ [--table-suffix <paimon-table-suffix>] \ + [--including-tables <mysql-table-name|name-regular-expr>] \ + [--excluding-tables <mysql-table-name|name-regular-expr>] \ [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \ [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \ [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]] @@ -127,6 +129,11 @@ an exception will be thrown. You can specify it to true explicitly to ignore the * `--table-prefix` is the prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify `--table-prefix ods_`. * `--table-suffix` is the suffix of all Paimon tables to be synchronized. The usage is same as `--table-prefix`. +* `--including-tables` is used to specify which source tables are to be synchronized. You must use '|' to separate multiple +tables. Regular expression is supported, for example, specifying `--including-tables test|paimon.*` means to synchronize +table 'test' and all tables start with 'paimon'. +* `--excluding-tables` is used to specify which source tables are not to be synchronized. The usage is same as `--including-tables`. +`--excluding-tables` has higher priority than `--including-tables` if you specified both. * `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format `key=value`. `hostname`, `username`, `password` and `database-name` are required configurations, others are optional. Note that `database-name` should be the exact name of the MySQL databse you want to synchronize. It can't be a regular expression. See its [document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto [...] * `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations. * `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. All Paimon sink table will be applied the same set of configurations. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index f442afe5c..dcd2c9d5c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -54,6 +54,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.regex.Pattern; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -102,6 +103,8 @@ public class MySqlSyncDatabaseAction implements Action { private final boolean ignoreIncompatible; private final String tablePrefix; private final String tableSuffix; + @Nullable private final Pattern includingPattern; + @Nullable private final Pattern excludingPattern; private final Map<String, String> catalogConfig; private final Map<String, String> tableConfig; @@ -119,6 +122,8 @@ public class MySqlSyncDatabaseAction implements Action { ignoreIncompatible, null, null, + null, + null, catalogConfig, tableConfig); } @@ -130,6 +135,8 @@ public class MySqlSyncDatabaseAction implements Action { boolean ignoreIncompatible, @Nullable String tablePrefix, @Nullable String tableSuffix, + @Nullable String includingTables, + @Nullable String excludingTables, Map<String, String> catalogConfig, Map<String, String> tableConfig) { this.mySqlConfig = Configuration.fromMap(mySqlConfig); @@ -138,6 +145,8 @@ public class MySqlSyncDatabaseAction implements Action { this.ignoreIncompatible = ignoreIncompatible; this.tablePrefix = tablePrefix == null ? "" : tablePrefix; this.tableSuffix = tableSuffix == null ? "" : tableSuffix; + this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); + this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); this.catalogConfig = catalogConfig; this.tableConfig = tableConfig; } @@ -251,6 +260,9 @@ public class MySqlSyncDatabaseAction implements Action { metaData.getTables(databaseName, null, "%", new String[] {"TABLE"})) { while (tables.next()) { String tableName = tables.getString("TABLE_NAME"); + if (!shouldMonitorTable(tableName)) { + continue; + } MySqlSchema mySqlSchema = new MySqlSchema(metaData, databaseName, tableName, caseSensitive); if (mySqlSchema.primaryKeys().size() > 0) { @@ -263,6 +275,18 @@ public class MySqlSyncDatabaseAction implements Action { return mySqlSchemaList; } + private boolean shouldMonitorTable(String mySqlTableName) { + boolean shouldMonitor = true; + if (includingPattern != null) { + shouldMonitor = includingPattern.matcher(mySqlTableName).matches(); + } + if (excludingPattern != null) { + shouldMonitor = shouldMonitor && !excludingPattern.matcher(mySqlTableName).matches(); + } + LOG.debug("Source table {} is monitored? {}", mySqlTableName, shouldMonitor); + return shouldMonitor; + } + private boolean shouldMonitorTable( TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier identifier) { if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) { @@ -311,6 +335,8 @@ public class MySqlSyncDatabaseAction implements Action { boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible")); String tablePrefix = params.get("table-prefix"); String tableSuffix = params.get("table-suffix"); + String includingTables = params.get("including-tables"); + String excludingTables = params.get("excluding-tables"); Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf"); Map<String, String> catalogConfig = getConfigMap(params, "catalog-conf"); @@ -327,6 +353,8 @@ public class MySqlSyncDatabaseAction implements Action { ignoreIncompatible, tablePrefix, tableSuffix, + includingTables, + excludingTables, catalogConfig == null ? Collections.emptyMap() : catalogConfig, tableConfig == null ? Collections.emptyMap() : tableConfig)); } @@ -366,6 +394,8 @@ public class MySqlSyncDatabaseAction implements Action { + "[--ignore-incompatible <true/false>] " + "[--table-prefix <paimon-table-prefix>] " + "[--table-suffix <paimon-table-suffix>] " + + "[--including-tables <mysql-table-name|name-regular-expr>] " + + "[--excluding-tables <mysql-table-name|name-regular-expr>] " + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] " + "[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] " + "[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]"); @@ -383,6 +413,16 @@ public class MySqlSyncDatabaseAction implements Action { System.out.println("The usage of --table-suffix is same as `--table-prefix`"); System.out.println(); + System.out.println( + "--including-tables is used to specify which source tables are to be synchronized. " + + "You must use '|' to separate multiple tables. Regular expression is supported."); + System.out.println( + "--excluding-tables is used to specify which source tables are not to be synchronized. " + + "The usage is same as --including-tables."); + System.out.println( + "--excluding-tables has higher priority than --including-tables if you specified both."); + System.out.println(); + System.out.println("MySQL CDC source conf syntax:"); System.out.println(" key=value"); System.out.println( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 7b0f5a581..9e2c912ef 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -36,6 +36,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import javax.annotation.Nullable; + import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; @@ -341,6 +343,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { } @Test + @Timeout(60) public void testTableAffix() throws Exception { // create table t1 Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); @@ -375,6 +378,8 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { false, "test_prefix_", "_test_suffix", + null, + null, Collections.emptyMap(), tableConfig); action.build(env); @@ -496,9 +501,106 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { waitForResult(expected, table2, rowType2, primaryKeys2); } + @Test + @Timeout(60) + public void testIncludingTables() throws Exception { + includingAndExcludingTablesImpl( + "paimon_sync_database_including", + "flink|paimon.+", + null, + Arrays.asList("flink", "paimon_1", "paimon_2"), + Collections.singletonList("ignored")); + } + + @Test + @Timeout(60) + public void testExcludingTables() throws Exception { + includingAndExcludingTablesImpl( + "paimon_sync_database_excluding", + null, + "flink|paimon.+", + Collections.singletonList("sync"), + Arrays.asList("flink", "paimon_1", "paimon_2")); + } + + @Test + @Timeout(60) + public void testIncludingAndExcludingTables() throws Exception { + includingAndExcludingTablesImpl( + "paimon_sync_database_in_excluding", + "flink|paimon.+", + "paimon_1", + Arrays.asList("flink", "paimon_2"), + Arrays.asList("paimon_1", "test")); + } + + private void includingAndExcludingTablesImpl( + String databaseName, + @Nullable String includingTables, + @Nullable String excludingTables, + List<String> existedTables, + List<String> notExistedTables) + throws Exception { + // try synchronization + Map<String, String> mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", databaseName); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map<String, String> tableConfig = new HashMap<>(); + tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1)); + tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); + MySqlSyncDatabaseAction action = + new MySqlSyncDatabaseAction( + mySqlConfig, + warehouse, + database, + false, + null, + null, + includingTables, + excludingTables, + Collections.emptyMap(), + tableConfig); + action.build(env); + JobClient client = env.executeAsync(); + + while (true) { + JobStatus status = client.getJobStatus().get(); + if (status == JobStatus.RUNNING) { + break; + } + Thread.sleep(1000); + } + + // check paimon tables + assertTableExists(existedTables); + assertTableNotExists(notExistedTables); + } + private FileStoreTable getFileStoreTable(String tableName) throws Exception { Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); Identifier identifier = Identifier.create(database, tableName); return (FileStoreTable) catalog.getTable(identifier); } + + private void assertTableExists(List<String> tableNames) { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + for (String tableName : tableNames) { + Identifier identifier = Identifier.create(database, tableName); + assertThat(catalog.tableExists(identifier)).isTrue(); + } + } + + private void assertTableNotExists(List<String> tableNames) { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + for (String tableName : tableNames) { + Identifier identifier = Identifier.create(database, tableName); + assertThat(catalog.tableExists(identifier)).isFalse(); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql index bd3eb309b..d18ccef31 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql +++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql @@ -328,3 +328,83 @@ CREATE TABLE t2 ( PRIMARY KEY (k2) ); +-- ################################################################################ +-- MySqlSyncDatabaseActionITCase#testIncludingTables +-- ################################################################################ + +CREATE DATABASE paimon_sync_database_including; +USE paimon_sync_database_including; + +CREATE TABLE paimon_1 ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE paimon_2 ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE flink ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE ignored ( + k INT, + PRIMARY KEY (k) +); + +-- ################################################################################ +-- MySqlSyncDatabaseActionITCase#testExcludingTables +-- ################################################################################ + +CREATE DATABASE paimon_sync_database_excluding; +USE paimon_sync_database_excluding; + +CREATE TABLE paimon_1 ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE paimon_2 ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE flink ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE sync ( + k INT, + PRIMARY KEY (k) +); + +-- ################################################################################ +-- MySqlSyncDatabaseActionITCase#testIncludingAndExcludingTables +-- ################################################################################ + +CREATE DATABASE paimon_sync_database_in_excluding; +USE paimon_sync_database_in_excluding; + +CREATE TABLE paimon_1 ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE paimon_2 ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE flink ( + k INT, + PRIMARY KEY (k) +); + +CREATE TABLE test ( + k INT, + PRIMARY KEY (k) +);
