This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 3e35a112baad2b8c7217ccf4d0d3af2f01202ec8
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 26 12:25:31 2023 +0800

    [flink] Support specifying including or excluding source tables in MySQL 
CDC action (#1021)
---
 docs/content/how-to/cdc-ingestion.md               |   7 ++
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  40 ++++++++
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 102 +++++++++++++++++++++
 .../src/test/resources/mysql/setup.sql             |  80 ++++++++++++++++
 4 files changed, 229 insertions(+)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 204a7eb8a..636b59da0 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -115,6 +115,8 @@ To use this feature through `flink run`, run the following 
shell command.
     [--ignore-incompatible <true/false>] \
     [--table-prefix <paimon-table-prefix>] \
     [--table-suffix <paimon-table-suffix>] \
+    [--including-tables <mysql-table-name|name-regular-expr>] \
+    [--excluding-tables <mysql-table-name|name-regular-expr>] \
     [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
@@ -127,6 +129,11 @@ an exception will be thrown. You can specify it to true 
explicitly to ignore the
 * `--table-prefix` is the prefix of all Paimon tables to be synchronized. For 
example, if you want all synchronized tables 
 to have "ods_" as prefix, you can specify `--table-prefix ods_`.
 * `--table-suffix` is the suffix of all Paimon tables to be synchronized. The 
usage is same as `--table-prefix`.
+* `--including-tables` is used to specify which source tables are to be 
synchronized. You must use '|' to separate multiple
+tables. Regular expression is supported, for example, specifying 
`--including-tables test|paimon.*` means to synchronize
+table 'test' and all tables start with 'paimon'.
+* `--excluding-tables` is used to specify which source tables are not to be 
synchronized. The usage is same as `--including-tables`.
+`--excluding-tables` has higher priority than `--including-tables` if you 
specified both.
 * `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format `key=value`. `hostname`, 
`username`, `password` and `database-name` are required configurations, others 
are optional. Note that `database-name` should be the exact name of the MySQL 
databse you want to synchronize. It can't be a regular expression. See its 
[document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto
 [...]
 * `--catalog-conf` is the configuration for Paimon catalog. Each configuration 
should be specified in the format `key=value`. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of catalog configurations.
 * `--table-conf` is the configuration for Paimon table sink. Each 
configuration should be specified in the format `key=value`. All Paimon sink 
table will be applied the same set of configurations. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of table configurations.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index f442afe5c..dcd2c9d5c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.regex.Pattern;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -102,6 +103,8 @@ public class MySqlSyncDatabaseAction implements Action {
     private final boolean ignoreIncompatible;
     private final String tablePrefix;
     private final String tableSuffix;
+    @Nullable private final Pattern includingPattern;
+    @Nullable private final Pattern excludingPattern;
     private final Map<String, String> catalogConfig;
     private final Map<String, String> tableConfig;
 
@@ -119,6 +122,8 @@ public class MySqlSyncDatabaseAction implements Action {
                 ignoreIncompatible,
                 null,
                 null,
+                null,
+                null,
                 catalogConfig,
                 tableConfig);
     }
@@ -130,6 +135,8 @@ public class MySqlSyncDatabaseAction implements Action {
             boolean ignoreIncompatible,
             @Nullable String tablePrefix,
             @Nullable String tableSuffix,
+            @Nullable String includingTables,
+            @Nullable String excludingTables,
             Map<String, String> catalogConfig,
             Map<String, String> tableConfig) {
         this.mySqlConfig = Configuration.fromMap(mySqlConfig);
@@ -138,6 +145,8 @@ public class MySqlSyncDatabaseAction implements Action {
         this.ignoreIncompatible = ignoreIncompatible;
         this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
         this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
+        this.includingPattern = includingTables == null ? null : 
Pattern.compile(includingTables);
+        this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
         this.catalogConfig = catalogConfig;
         this.tableConfig = tableConfig;
     }
@@ -251,6 +260,9 @@ public class MySqlSyncDatabaseAction implements Action {
                     metaData.getTables(databaseName, null, "%", new String[] 
{"TABLE"})) {
                 while (tables.next()) {
                     String tableName = tables.getString("TABLE_NAME");
+                    if (!shouldMonitorTable(tableName)) {
+                        continue;
+                    }
                     MySqlSchema mySqlSchema =
                             new MySqlSchema(metaData, databaseName, tableName, 
caseSensitive);
                     if (mySqlSchema.primaryKeys().size() > 0) {
@@ -263,6 +275,18 @@ public class MySqlSyncDatabaseAction implements Action {
         return mySqlSchemaList;
     }
 
+    private boolean shouldMonitorTable(String mySqlTableName) {
+        boolean shouldMonitor = true;
+        if (includingPattern != null) {
+            shouldMonitor = includingPattern.matcher(mySqlTableName).matches();
+        }
+        if (excludingPattern != null) {
+            shouldMonitor = shouldMonitor && 
!excludingPattern.matcher(mySqlTableName).matches();
+        }
+        LOG.debug("Source table {} is monitored? {}", mySqlTableName, 
shouldMonitor);
+        return shouldMonitor;
+    }
+
     private boolean shouldMonitorTable(
             TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier 
identifier) {
         if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) {
@@ -311,6 +335,8 @@ public class MySqlSyncDatabaseAction implements Action {
         boolean ignoreIncompatible = 
Boolean.parseBoolean(params.get("ignore-incompatible"));
         String tablePrefix = params.get("table-prefix");
         String tableSuffix = params.get("table-suffix");
+        String includingTables = params.get("including-tables");
+        String excludingTables = params.get("excluding-tables");
 
         Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
         Map<String, String> catalogConfig = getConfigMap(params, 
"catalog-conf");
@@ -327,6 +353,8 @@ public class MySqlSyncDatabaseAction implements Action {
                         ignoreIncompatible,
                         tablePrefix,
                         tableSuffix,
+                        includingTables,
+                        excludingTables,
                         catalogConfig == null ? Collections.emptyMap() : 
catalogConfig,
                         tableConfig == null ? Collections.emptyMap() : 
tableConfig));
     }
@@ -366,6 +394,8 @@ public class MySqlSyncDatabaseAction implements Action {
                         + "[--ignore-incompatible <true/false>] "
                         + "[--table-prefix <paimon-table-prefix>] "
                         + "[--table-suffix <paimon-table-suffix>] "
+                        + "[--including-tables 
<mysql-table-name|name-regular-expr>] "
+                        + "[--excluding-tables 
<mysql-table-name|name-regular-expr>] "
                         + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
                         + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -383,6 +413,16 @@ public class MySqlSyncDatabaseAction implements Action {
         System.out.println("The usage of --table-suffix is same as 
`--table-prefix`");
         System.out.println();
 
+        System.out.println(
+                "--including-tables is used to specify which source tables are 
to be synchronized. "
+                        + "You must use '|' to separate multiple tables. 
Regular expression is supported.");
+        System.out.println(
+                "--excluding-tables is used to specify which source tables are 
not to be synchronized. "
+                        + "The usage is same as --including-tables.");
+        System.out.println(
+                "--excluding-tables has higher priority than 
--including-tables if you specified both.");
+        System.out.println();
+
         System.out.println("MySQL CDC source conf syntax:");
         System.out.println("  key=value");
         System.out.println(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 7b0f5a581..9e2c912ef 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -36,6 +36,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import javax.annotation.Nullable;
+
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
@@ -341,6 +343,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     }
 
     @Test
+    @Timeout(60)
     public void testTableAffix() throws Exception {
         // create table t1
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
@@ -375,6 +378,8 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         false,
                         "test_prefix_",
                         "_test_suffix",
+                        null,
+                        null,
                         Collections.emptyMap(),
                         tableConfig);
         action.build(env);
@@ -496,9 +501,106 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table2, rowType2, primaryKeys2);
     }
 
+    @Test
+    @Timeout(60)
+    public void testIncludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "paimon_sync_database_including",
+                "flink|paimon.+",
+                null,
+                Arrays.asList("flink", "paimon_1", "paimon_2"),
+                Collections.singletonList("ignored"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testExcludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "paimon_sync_database_excluding",
+                null,
+                "flink|paimon.+",
+                Collections.singletonList("sync"),
+                Arrays.asList("flink", "paimon_1", "paimon_2"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIncludingAndExcludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "paimon_sync_database_in_excluding",
+                "flink|paimon.+",
+                "paimon_1",
+                Arrays.asList("flink", "paimon_2"),
+                Arrays.asList("paimon_1", "test"));
+    }
+
+    private void includingAndExcludingTablesImpl(
+            String databaseName,
+            @Nullable String includingTables,
+            @Nullable String excludingTables,
+            List<String> existedTables,
+            List<String> notExistedTables)
+            throws Exception {
+        // try synchronization
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", databaseName);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 
1));
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        null,
+                        null,
+                        includingTables,
+                        excludingTables,
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        // check paimon tables
+        assertTableExists(existedTables);
+        assertTableNotExists(notExistedTables);
+    }
+
     private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
         return (FileStoreTable) catalog.getTable(identifier);
     }
+
+    private void assertTableExists(List<String> tableNames) {
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        for (String tableName : tableNames) {
+            Identifier identifier = Identifier.create(database, tableName);
+            assertThat(catalog.tableExists(identifier)).isTrue();
+        }
+    }
+
+    private void assertTableNotExists(List<String> tableNames) {
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        for (String tableName : tableNames) {
+            Identifier identifier = Identifier.create(database, tableName);
+            assertThat(catalog.tableExists(identifier)).isFalse();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index bd3eb309b..d18ccef31 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -328,3 +328,83 @@ CREATE TABLE t2 (
     PRIMARY KEY (k2)
 );
 
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase#testIncludingTables
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database_including;
+USE paimon_sync_database_including;
+
+CREATE TABLE paimon_1 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE ignored (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase#testExcludingTables
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database_excluding;
+USE paimon_sync_database_excluding;
+
+CREATE TABLE paimon_1 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE sync (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase#testIncludingAndExcludingTables
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database_in_excluding;
+USE paimon_sync_database_in_excluding;
+
+CREATE TABLE paimon_1 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE test (
+    k INT,
+    PRIMARY KEY (k)
+);

Reply via email to