This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2116843ce8 [Feature][MySQL-CDC] Support database/table wildcards scan
read (#8323)
2116843ce8 is described below
commit 2116843ce8e1fbd8225390e91d9adae371578baa
Author: zhangdonghao <[email protected]>
AuthorDate: Tue Jan 7 18:34:08 2025 +0800
[Feature][MySQL-CDC] Support database/table wildcards scan read (#8323)
---
docs/en/connector-v2/source/MySQL-CDC.md | 30 +++++++
.../cdc/base/config/JdbcSourceConfigFactory.java | 4 +
.../cdc/mysql/config/MySqlSourceConfigFactory.java | 4 +
.../source/MySqlIncrementalSourceFactory.java | 9 ++-
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 59 ++++++++++++++
.../src/test/resources/ddl/wildcards.sql | 93 ++++++++++++++++++++++
.../src/test/resources/ddl/wildcards_dml.sql | 34 ++++++++
.../resources/mysqlcdc_wildcards_to_mysql.conf | 52 ++++++++++++
8 files changed, 282 insertions(+), 3 deletions(-)
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md
b/docs/en/connector-v2/source/MySQL-CDC.md
index 0114d5c1d5..42d3db09c9 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -175,7 +175,9 @@ When an initial consistent snapshot is made for large
databases, your establishe
| username | String | Yes | -
| Name of the database to use when connecting to the database server.
[...]
| password | String | Yes | -
| Password to use when connecting to the database server.
[...]
| database-names | List | No | -
| Database name of the database to monitor.
[...]
+| database-pattern | String | No | .*
| The database names RegEx of the database to capture, for example:
`database_prefix.*`.
[...]
| table-names | List | Yes | -
| Table name of the database to monitor. The table name needs to include the
database name, for example: `database_name.table_name`
[...]
+| table-pattern | String | Yes | -
| The table names RegEx of the database to capture. The table name needs to
include the database name, for example: `database.*\\.table_.*`
[...]
| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]
[...]
| startup.mode | Enum | No |
INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize
historical data at startup, and then synchronize incremental data.<br/>
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup
from the latest offset.<br/> `specific`: Startup from user-supplied specific
offsets. [...]
| startup.specific-offset.file | String | No | -
| Start from the specified binlog file name. **Note, This option is required
when the `startup.mode` option used `specific`.**
[...]
@@ -303,6 +305,34 @@ sink {
}
```
+### Support table-pattern for multi-table reading
+> `table-pattern` and `table-names` are mutually exclusive
+```hocon
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652
+ username = "st_user_source"
+ password = "mysqlpw"
+ database-pattern = "source.*"
+ table-pattern = "source.*\\..*"
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306"
+ }
+}
+
+sink {
+ Console {
+ }
+}
+```
## Changelog
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 87dd7d3a8f..4f58172a6e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -42,6 +42,8 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
protected String originUrl;
protected List<String> databaseList;
protected List<String> tableList;
+ protected String databasePattern;
+ protected String tablePattern;
protected StartupConfig startupConfig;
protected StopConfig stopConfig;
protected double distributionFactorUpper =
@@ -243,6 +245,8 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
this.password = config.get(JdbcSourceOptions.PASSWORD);
this.databaseList = config.get(JdbcSourceOptions.DATABASE_NAMES);
this.tableList = config.get(CatalogOptions.TABLE_NAMES);
+ this.databasePattern = config.get(CatalogOptions.DATABASE_PATTERN);
+ this.tablePattern = config.get(CatalogOptions.TABLE_PATTERN);
this.distributionFactorUpper =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
this.distributionFactorLower =
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index db63e4e4dc..5cc1b51d0f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -95,9 +95,13 @@ public class MySqlSourceConfigFactory extends
JdbcSourceConfigFactory {
}
if (databaseList != null) {
props.setProperty("database.include.list", String.join(",",
databaseList));
+ } else if (databasePattern != null) {
+ props.setProperty("database.include.list", databasePattern);
}
if (tableList != null) {
props.setProperty("table.include.list", String.join(",",
tableList));
+ } else if (tablePattern != null) {
+ props.setProperty("table.include.list", tablePattern);
}
if (serverTimeZone != null) {
props.setProperty("database.serverTimezone", serverTimeZone);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index c11f9e72d4..5c93aa6454 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
@@ -38,12 +39,14 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceCon
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
@AutoService(Factory.class)
+@Slf4j
public class MySqlIncrementalSourceFactory extends
BaseChangeStreamTableSourceFactory {
@Override
public String factoryIdentifier() {
@@ -99,9 +102,9 @@ public class MySqlIncrementalSourceFactory extends
BaseChangeStreamTableSourceFa
TableSource<T, SplitT, StateT> restoreSource(
TableSourceFactoryContext context, List<CatalogTable>
restoreTables) {
return () -> {
+ ReadonlyConfig config = context.getOptions();
List<CatalogTable> catalogTables =
- CatalogTableUtil.getCatalogTables(
- context.getOptions(), context.getClassLoader());
+ CatalogTableUtil.getCatalogTables(config,
context.getClassLoader());
boolean enableSchemaChange =
context.getOptions()
.getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED)
@@ -137,7 +140,7 @@ public class MySqlIncrementalSourceFactory extends
BaseChangeStreamTableSourceFa
text -> TablePath.of(text, false));
}
return (SeaTunnelSource<T, SplitT, StateT>)
- new MySqlIncrementalSource<>(context.getOptions(),
catalogTables);
+ new MySqlIncrementalSource<>(config, catalogTables);
};
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 35a86a4e26..f422b53d13 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
@Slf4j
@DisabledOnContainer(
@@ -72,6 +73,7 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(
MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw",
MYSQL_DATABASE);
+ private final String QUERY_SQL = "select * from %s.%s";
// mysql source table query sql
private static final String SOURCE_SQL_TEMPLATE =
@@ -539,6 +541,59 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY)))));
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
+ public void testMysqlCdcByWildcardsConfig(TestContainer container)
+ throws IOException, InterruptedException {
+ inventoryDatabase.setTemplateName("wildcards").createAndInitialize();
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+
container.executeJob("/mysqlcdc_wildcards_to_mysql.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ TimeUnit.SECONDS.sleep(5);
+
inventoryDatabase.setTemplateName("wildcards_dml").createAndInitialize();
+ given().pollDelay(20, TimeUnit.SECONDS)
+ .pollInterval(2000, TimeUnit.MILLISECONDS)
+ .await()
+ .atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertAll(
+ () -> {
+ log.info(
+ query(getQuerySQL("sink",
"source_products"))
+ .toString());
+ Assertions.assertIterableEquals(
+ query(getQuerySQL("source",
"products")),
+ query(getQuerySQL("sink",
"source_products")));
+ },
+ () -> {
+ log.info(
+ query(getQuerySQL("sink",
"source_customers"))
+ .toString());
+ Assertions.assertIterableEquals(
+ query(getQuerySQL("source",
"customers")),
+ query(getQuerySQL("sink",
"source_customers")));
+ },
+ () -> {
+ log.info(
+ query(getQuerySQL("sink",
"source1_orders"))
+ .toString());
+ Assertions.assertIterableEquals(
+ query(getQuerySQL("source1",
"orders")),
+ query(getQuerySQL("sink",
"source1_orders")));
+ });
+ });
+ }
+
private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
@@ -703,4 +758,8 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
private String getSinkQuerySQL(String database, String tableName) {
return String.format(SINK_SQL_TEMPLATE, database, tableName);
}
+
+ private String getQuerySQL(String database, String tableName) {
+ return String.format(QUERY_SQL, database, tableName);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards.sql
new file mode 100644
index 0000000000..2e35c8c690
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards.sql
@@ -0,0 +1,93 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: source
+--
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `source`;
+use `source`;
+
+drop table if exists `source`.`products`;
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ description VARCHAR(512),
+ weight FLOAT
+);
+
+ALTER TABLE `source`.`products` AUTO_INCREMENT = 101;
+
+INSERT INTO `source`.`products`
+VALUES (101,"scooter","Small 2-wheel scooter",3.14),
+ (102,"car battery","12V car battery",8.1),
+ (103,"12-pack drill bits","12-pack of drill bits with sizes ranging
from #40 to #3",0.8),
+ (104,"hammer","12oz carpenter's hammer",0.75),
+ (105,"hammer","14oz carpenter's hammer",0.875),
+ (106,"hammer","16oz carpenter's hammer",1.0),
+ (107,"rocks","box of assorted rocks",5.3),
+ (108,"jacket","water resistent black wind breaker",0.1),
+ (109,"spare tire","24 inch spare tire",22.2);
+
+
+DROP TABLE IF EXISTS `source`.`customers`;
+CREATE TABLE `source`.`customers` (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ first_name VARCHAR(255) NOT NULL,
+ last_name VARCHAR(255) NOT NULL,
+ email VARCHAR(255) NOT NULL UNIQUE KEY
+) AUTO_INCREMENT=1001;
+
+
+INSERT INTO `source`.`customers`
+VALUES (1001,"Sally","Thomas","[email protected]"),
+ (1002,"George","Bailey","[email protected]"),
+ (1003,"Edward","Walker","[email protected]"),
+ (1004,"Anne","Kretchmar","[email protected]");
+
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: source1
+--
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `source1`;
+use `source1`;
+
+DROP TABLE IF EXISTS `source1`.`orders`;
+CREATE TABLE `source1`.`orders` (
+ order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ order_date DATE NOT NULL,
+ purchaser INTEGER NOT NULL,
+ quantity INTEGER NOT NULL,
+ product_id INTEGER NOT NULL
+) AUTO_INCREMENT = 10001;
+
+
+INSERT INTO `source1`.`orders`
+VALUES (10001, '2016-01-16', 1001, 1, 102),
+ (10002, '2016-01-17', 1002, 2, 105),
+ (10003, '2016-02-18', 1004, 3, 109),
+ (10004, '2016-02-19', 1002, 2, 106),
+ (10005, '16-02-21', 1003, 1, 107);
+
+CREATE DATABASE IF NOT EXISTS `sink`;
+
+use `sink`;
+
+DROP TABLE IF EXISTS `source_products`;
+DROP TABLE IF EXISTS `source_customers`;
+DROP TABLE IF EXISTS `source1_orders`;
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards_dml.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards_dml.sql
new file mode 100644
index 0000000000..ce3deef638
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/wildcards_dml.sql
@@ -0,0 +1,34 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: source
+--
----------------------------------------------------------------------------------------------------------------
+
+use `source`;
+
+UPDATE `source`.`products` SET name = 'Illustrated new quality productivity'
WHERE id = 102;
+INSERT INTO `source`.`customers` VALUES
(1005,"Zhangdonghao","","[email protected]");
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: source1
+--
----------------------------------------------------------------------------------------------------------------
+
+use `source1`;
+DELETE FROM `source1`.`orders` where order_number < 10004;
+
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_wildcards_to_mysql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_wildcards_to_mysql.conf
new file mode 100644
index 0000000000..105063d04a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_wildcards_to_mysql.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-pattern = "source.*\\..*"
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306"
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/sink"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_sink"
+ password = "mysqlpw"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = sink
+ table = "${database_name}_${table_name}"
+ }
+}
\ No newline at end of file