This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c300b278e [flink] Support specifying including or excluding source
tables in MySQL CDC action (#1021)
c300b278e is described below
commit c300b278e18015abf19b6f4a10ef2cae570afce3
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 26 12:25:31 2023 +0800
[flink] Support specifying including or excluding source tables in MySQL
CDC action (#1021)
---
docs/content/how-to/cdc-ingestion.md | 7 ++
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 40 ++++++++
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 102 +++++++++++++++++++++
.../src/test/resources/mysql/setup.sql | 80 ++++++++++++++++
4 files changed, 229 insertions(+)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index 204a7eb8a..636b59da0 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -115,6 +115,8 @@ To use this feature through `flink run`, run the following
shell command.
[--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
+ [--including-tables <mysql-table-name|name-regular-expr>] \
+ [--excluding-tables <mysql-table-name|name-regular-expr>] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf
<paimon-table-sink-conf> ...]]
@@ -127,6 +129,11 @@ an exception will be thrown. You can specify it to true
explicitly to ignore the
* `--table-prefix` is the prefix of all Paimon tables to be synchronized. For
example, if you want all synchronized tables
to have "ods_" as prefix, you can specify `--table-prefix ods_`.
* `--table-suffix` is the suffix of all Paimon tables to be synchronized. The
usage is same as `--table-prefix`.
+* `--including-tables` is used to specify which source tables are to be
synchronized. You must use '|' to separate multiple
+tables. Regular expression is supported, for example, specifying
`--including-tables test|paimon.*` means to synchronize
+table 'test' and all tables start with 'paimon'.
+* `--excluding-tables` is used to specify which source tables are not to be
synchronized. The usage is same as `--including-tables`.
+`--excluding-tables` has higher priority than `--including-tables` if you
specified both.
* `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format `key=value`. `hostname`,
`username`, `password` and `database-name` are required configurations, others
are optional. Note that `database-name` should be the exact name of the MySQL
databse you want to synchronize. It can't be a regular expression. See its
[document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto
[...]
* `--catalog-conf` is the configuration for Paimon catalog. Each configuration
should be specified in the format `key=value`. See [here]({{< ref
"maintenance/configurations" >}}) for a complete list of catalog configurations.
* `--table-conf` is the configuration for Paimon table sink. Each
configuration should be specified in the format `key=value`. All Paimon sink
table will be applied the same set of configurations. See [here]({{< ref
"maintenance/configurations" >}}) for a complete list of table configurations.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index f442afe5c..dcd2c9d5c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.regex.Pattern;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -102,6 +103,8 @@ public class MySqlSyncDatabaseAction implements Action {
private final boolean ignoreIncompatible;
private final String tablePrefix;
private final String tableSuffix;
+ @Nullable private final Pattern includingPattern;
+ @Nullable private final Pattern excludingPattern;
private final Map<String, String> catalogConfig;
private final Map<String, String> tableConfig;
@@ -119,6 +122,8 @@ public class MySqlSyncDatabaseAction implements Action {
ignoreIncompatible,
null,
null,
+ null,
+ null,
catalogConfig,
tableConfig);
}
@@ -130,6 +135,8 @@ public class MySqlSyncDatabaseAction implements Action {
boolean ignoreIncompatible,
@Nullable String tablePrefix,
@Nullable String tableSuffix,
+ @Nullable String includingTables,
+ @Nullable String excludingTables,
Map<String, String> catalogConfig,
Map<String, String> tableConfig) {
this.mySqlConfig = Configuration.fromMap(mySqlConfig);
@@ -138,6 +145,8 @@ public class MySqlSyncDatabaseAction implements Action {
this.ignoreIncompatible = ignoreIncompatible;
this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
+ this.includingPattern = includingTables == null ? null :
Pattern.compile(includingTables);
+ this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
this.catalogConfig = catalogConfig;
this.tableConfig = tableConfig;
}
@@ -251,6 +260,9 @@ public class MySqlSyncDatabaseAction implements Action {
metaData.getTables(databaseName, null, "%", new String[]
{"TABLE"})) {
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
+ if (!shouldMonitorTable(tableName)) {
+ continue;
+ }
MySqlSchema mySqlSchema =
new MySqlSchema(metaData, databaseName, tableName,
caseSensitive);
if (mySqlSchema.primaryKeys().size() > 0) {
@@ -263,6 +275,18 @@ public class MySqlSyncDatabaseAction implements Action {
return mySqlSchemaList;
}
+ private boolean shouldMonitorTable(String mySqlTableName) {
+ boolean shouldMonitor = true;
+ if (includingPattern != null) {
+ shouldMonitor = includingPattern.matcher(mySqlTableName).matches();
+ }
+ if (excludingPattern != null) {
+ shouldMonitor = shouldMonitor &&
!excludingPattern.matcher(mySqlTableName).matches();
+ }
+ LOG.debug("Source table {} is monitored? {}", mySqlTableName,
shouldMonitor);
+ return shouldMonitor;
+ }
+
private boolean shouldMonitorTable(
TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier
identifier) {
if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) {
@@ -311,6 +335,8 @@ public class MySqlSyncDatabaseAction implements Action {
boolean ignoreIncompatible =
Boolean.parseBoolean(params.get("ignore-incompatible"));
String tablePrefix = params.get("table-prefix");
String tableSuffix = params.get("table-suffix");
+ String includingTables = params.get("including-tables");
+ String excludingTables = params.get("excluding-tables");
Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
Map<String, String> catalogConfig = getConfigMap(params,
"catalog-conf");
@@ -327,6 +353,8 @@ public class MySqlSyncDatabaseAction implements Action {
ignoreIncompatible,
tablePrefix,
tableSuffix,
+ includingTables,
+ excludingTables,
catalogConfig == null ? Collections.emptyMap() :
catalogConfig,
tableConfig == null ? Collections.emptyMap() :
tableConfig));
}
@@ -366,6 +394,8 @@ public class MySqlSyncDatabaseAction implements Action {
+ "[--ignore-incompatible <true/false>] "
+ "[--table-prefix <paimon-table-prefix>] "
+ "[--table-suffix <paimon-table-suffix>] "
+ + "[--including-tables
<mysql-table-name|name-regular-expr>] "
+ + "[--excluding-tables
<mysql-table-name|name-regular-expr>] "
+ "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...]] "
+ "[--catalog-conf <paimon-catalog-conf>
[--catalog-conf <paimon-catalog-conf> ...]] "
+ "[--table-conf <paimon-table-sink-conf>
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -383,6 +413,16 @@ public class MySqlSyncDatabaseAction implements Action {
System.out.println("The usage of --table-suffix is same as
`--table-prefix`");
System.out.println();
+ System.out.println(
+ "--including-tables is used to specify which source tables are
to be synchronized. "
+ + "You must use '|' to separate multiple tables.
Regular expression is supported.");
+ System.out.println(
+ "--excluding-tables is used to specify which source tables are
not to be synchronized. "
+ + "The usage is same as --including-tables.");
+ System.out.println(
+ "--excluding-tables has higher priority than
--including-tables if you specified both.");
+ System.out.println();
+
System.out.println("MySQL CDC source conf syntax:");
System.out.println(" key=value");
System.out.println(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 7b0f5a581..9e2c912ef 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -36,6 +36,8 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import javax.annotation.Nullable;
+
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
@@ -341,6 +343,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
@Test
+ @Timeout(60)
public void testTableAffix() throws Exception {
// create table t1
Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
@@ -375,6 +378,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
false,
"test_prefix_",
"_test_suffix",
+ null,
+ null,
Collections.emptyMap(),
tableConfig);
action.build(env);
@@ -496,9 +501,106 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table2, rowType2, primaryKeys2);
}
+ @Test
+ @Timeout(60)
+ public void testIncludingTables() throws Exception {
+ includingAndExcludingTablesImpl(
+ "paimon_sync_database_including",
+ "flink|paimon.+",
+ null,
+ Arrays.asList("flink", "paimon_1", "paimon_2"),
+ Collections.singletonList("ignored"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testExcludingTables() throws Exception {
+ includingAndExcludingTablesImpl(
+ "paimon_sync_database_excluding",
+ null,
+ "flink|paimon.+",
+ Collections.singletonList("sync"),
+ Arrays.asList("flink", "paimon_1", "paimon_2"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testIncludingAndExcludingTables() throws Exception {
+ includingAndExcludingTablesImpl(
+ "paimon_sync_database_in_excluding",
+ "flink|paimon.+",
+ "paimon_1",
+ Arrays.asList("flink", "paimon_2"),
+ Arrays.asList("paimon_1", "test"));
+ }
+
+ private void includingAndExcludingTablesImpl(
+ String databaseName,
+ @Nullable String includingTables,
+ @Nullable String excludingTables,
+ List<String> existedTables,
+ List<String> notExistedTables)
+ throws Exception {
+ // try synchronization
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", databaseName);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+ tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) +
1));
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ null,
+ null,
+ includingTables,
+ excludingTables,
+ Collections.emptyMap(),
+ tableConfig);
+ action.build(env);
+ JobClient client = env.executeAsync();
+
+ while (true) {
+ JobStatus status = client.getJobStatus().get();
+ if (status == JobStatus.RUNNING) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ // check paimon tables
+ assertTableExists(existedTables);
+ assertTableNotExists(notExistedTables);
+ }
+
private FileStoreTable getFileStoreTable(String tableName) throws
Exception {
Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
Identifier identifier = Identifier.create(database, tableName);
return (FileStoreTable) catalog.getTable(identifier);
}
+
+ private void assertTableExists(List<String> tableNames) {
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ for (String tableName : tableNames) {
+ Identifier identifier = Identifier.create(database, tableName);
+ assertThat(catalog.tableExists(identifier)).isTrue();
+ }
+ }
+
+ private void assertTableNotExists(List<String> tableNames) {
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ for (String tableName : tableNames) {
+ Identifier identifier = Identifier.create(database, tableName);
+ assertThat(catalog.tableExists(identifier)).isFalse();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index bd3eb309b..d18ccef31 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -328,3 +328,83 @@ CREATE TABLE t2 (
PRIMARY KEY (k2)
);
+--
################################################################################
+-- MySqlSyncDatabaseActionITCase#testIncludingTables
+--
################################################################################
+
+CREATE DATABASE paimon_sync_database_including;
+USE paimon_sync_database_including;
+
+CREATE TABLE paimon_1 (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE ignored (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+--
################################################################################
+-- MySqlSyncDatabaseActionITCase#testExcludingTables
+--
################################################################################
+
+CREATE DATABASE paimon_sync_database_excluding;
+USE paimon_sync_database_excluding;
+
+CREATE TABLE paimon_1 (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE sync (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+--
################################################################################
+-- MySqlSyncDatabaseActionITCase#testIncludingAndExcludingTables
+--
################################################################################
+
+CREATE DATABASE paimon_sync_database_in_excluding;
+USE paimon_sync_database_in_excluding;
+
+CREATE TABLE paimon_1 (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+ k INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE test (
+ k INT,
+ PRIMARY KEY (k)
+);