This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 670a52a918 [Feature][Jdbc] Support read multiple tables by regular
expressions (#9380)
670a52a918 is described below
commit 670a52a918ce6fb4baa73006856eb1b085e2ff97
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jun 27 17:23:43 2025 +0800
[Feature][Jdbc] Support read multiple tables by regular expressions (#9380)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connector-v2/source/Jdbc.md | 97 ++++-
docs/en/connector-v2/source/Mysql.md | 1 +
docs/en/connector-v2/source/Oracle.md | 1 +
docs/en/connector-v2/source/PostgreSQL.md | 1 +
docs/en/connector-v2/source/SqlServer.md | 1 +
docs/zh/connector-v2/source/Mysql.md | 1 +
docs/zh/connector-v2/source/PostgreSQL.md | 1 +
.../seatunnel/api/table/catalog/Catalog.java | 38 +-
.../seatunnel/jdbc/config/JdbcSourceOptions.java | 6 +
.../jdbc/config/JdbcSourceTableConfig.java | 7 +
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 131 ++++++-
.../seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java | 409 +++++++++++++++++++++
.../seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java | 30 ++
.../seatunnel/jdbc/JdbcOracleMultipleTablesIT.java | 29 ++
..._mysql_source_and_sink_with_pattern_tables.conf | 57 +++
..._oracle_source_with_pattern_tables_to_sink.conf | 63 ++++
16 files changed, 847 insertions(+), 26 deletions(-)
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 0b62ac0fef..0e7bd4a791 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -60,6 +60,7 @@ supports query SQL and can achieve projection effect.
| handle_blob_as_string | Boolean | No | false
| If true, BLOB type will be converted to STRING type. **Only supported
for Oracle database**. This is useful for handling large BLOB fields in Oracle
that exceed the default size limit. When transmitting Oracle's BLOB fields to
systems like Doris, setting this to true can make the data transfer more
efficient.
[...]
| use_select_count | Boolean | No | false
| Use select count for table count rather then other methods in dynamic
chunk split stage. This is currently only available for jdbc-oracle.In this
scenario, select count directly is used when it is faster to update statistics
using sql from analysis table
[...]
| skip_analyze | Boolean | No | false
| Skip the analysis of table count in dynamic chunk split stage. This is
currently only available for jdbc-oracle.In this scenario, you schedule
analysis table sql to update related table statistics periodically or your
table data does not change frequently
[...]
+| use_regex | Boolean | No | false
| Control regular expression matching for table_path. When set to `true`,
the table_path will be treated as a regular expression pattern. When set to
`false` or not specified, the table_path will be treated as an exact path (no
regex matching). |
| fetch_size | Int | No | 0
| For queries that return a large number of objects, you can configure the
row fetch size used in the query to improve performance by reducing the number
database hits required to satisfy the selection criteria. Zero means use jdbc
default value.
[...]
| properties | Map | No | -
| Additional connection configuration parameters,when properties and URL
have the same parameters, the priority is determined by the <br/>specific
implementation of the driver. For example, in MySQL, properties take precedence
over the URL.
[...]
| table_path | String | No | -
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>`- mysql: "testdb.table1" `<br/>`-
oracle: "test_schema.table1" `<br/>`- sqlserver: "testdb.test_schema.table1"`
<br/>`- postgresql: "testdb.test_schema.table1"` <br/>`- iris:
"test_schema.table1"`
[...]
@@ -74,6 +75,95 @@ supports query SQL and can achieve projection effect.
| split.string_split_mode | String | No | sample
| Supports different string splitting algorithms. By default, `sample` is
used to determine the split by sampling the string value. You can switch to
`charset_based` to enable charset-based string splitting algorithm. When set to
`charset_based`, the algorithm assumes characters of partition_column are
within ASCII range 32-126, which covers most character-based splitting
scenarios. [...]
| split.string_split_mode_collate | String | No | -
| Specifies the collation to use when string_split_mode is set to
`charset_based` and the table has a special collation. If not specified, the
database's default collation will be used.
[...]
+### Table Matching
+
+The JDBC Source connector supports two ways to specify tables:
+
+1. **Exact Table Path**: Use `table_path` to specify a single table with its
full path.
+ ```hocon
+ table_path = "testdb.table1"
+ ```
+
+2. **Regular Expression**: Use `table_path` with a regex pattern to match
multiple tables.
+ ```hocon
+ table_path = "testdb.table\\d+" # Matches table1, table2, table3, etc.
+ use_regex = true
+ ```
+
+#### Regular Expression Support for Table Names
+
+The JDBC connector supports using regular expressions to match multiple
tables. This feature allows you to process multiple tables with a single source
configuration.
+
+#### Configuration
+
+To use regular expression matching for table paths:
+
+1. Set `use_regex = true` to enable regex matching
+2. If `use_regex` is not set or set to `false`, the connector will treat the
table_path as an exact path (no regex matching)
+
+#### Regular Expression Syntax Notes
+
+- **Path Separator**: The dot (`.`) is treated as a separator between
database, schema, and table names.
+- **Escaped Dots**: If you need to use a dot (`.`) as a wildcard character in
your regular expression to match any character, you must escape it with a
backslash (`\.`).
+- **Path Format**: For paths like `database.table` or `database.schema.table`,
the last unescaped dot separates the table pattern from the database/schema
pattern.
+- **Pattern Examples**:
+ - `test.table\\d+` - Matches tables like `table1`, `table2`, etc. in the
`test` database
+ - `test.*` - Matches all tables in the `test` database (for whole database
synchronization)
+ - `postgres.public.test_db_\.*` - Matches all tables that start with
`test_db_` in the `public` schema of the `postgres` database
+
+#### Example
+
+```hocon
+source {
+ Jdbc {
+ url = "jdbc:mysql://localhost:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "password"
+
+ table_list = [
+ {
+ # Regex matching - match any table in test database
+ table_path = "test.*"
+ use_regex = true
+ },
+ {
+ # Regex matching - match tables with "user" followed by digits
+ table_path = "test.user\\d+"
+ use_regex = true
+ },
+ {
+ # Exact matching - simple table name
+ table_path = "test.config"
+ # use_regex not specified, defaults to false
+ },
+ ]
+ }
+}
+```
+
+#### Multi-table Synchronization
+
+When using either regular expressions, the connector will read data from all
matching tables. Each table will be processed independently, and the data will
be combined in the output.
+
+Example configuration for multi-table synchronization:
+```hocon
+Jdbc {
+ url = "jdbc:mysql://localhost/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "123456"
+
+ # Using regular expression with explicit configuration
+ table_list = [
+ {
+ table_path = "testdb.table\\d+"
+ use_regex = true
+ }
+ ]
+}
+```
+
### decimal_type_narrowing
Decimal type narrowing, if true, the decimal type will be narrowed to the int
or long type if without loss of precision. Only support for Oracle at now.
@@ -354,8 +444,13 @@ Jdbc {
},
{
table_path = "testdb.table2"
- # Use query filetr rows & columns
+ # Use query filter rows & columns
query = "select id, name from testdb.table2 where id > 100"
+ },
+ {
+ # Using regex to match multiple tables
+ table_path = "testdb.user_table\\d+"
+ use_regex = true
}
]
#where_condition= "where id > 100"
diff --git a/docs/en/connector-v2/source/Mysql.md
b/docs/en/connector-v2/source/Mysql.md
index 85519584d6..9a84df8652 100644
--- a/docs/en/connector-v2/source/Mysql.md
+++ b/docs/en/connector-v2/source/Mysql.md
@@ -83,6 +83,7 @@ Read external data source data through JDBC.
| partition_num | Int | No | job
parallelism | The number of partition count, only support positive integer.
default value is job parallelism
[...]
| fetch_size | Int | No | 0
| For queries that return a large number of objects,you can
configure<br/> the row fetch size used in the query toimprove performance
by<br/> reducing the number database hits required to satisfy the selection
criteria.<br/> Zero means use jdbc default value.
[...]
| properties | Map | No | -
| Additional connection configuration parameters,when properties and
URL have the same parameters, the priority is determined by the <br/>specific
implementation of the driver. For example, in MySQL, properties take precedence
over the URL.
[...]
+| use_regex | Boolean | No | false
| Control regular expression matching for table_path. When set to
`true`, the table_path will be treated as a regular expression pattern. When
set to `false` or not specified, the table_path will be treated as an exact
path (no regex matching).
[...]
| table_path | String | No | -
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle:
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1"
<br/>postgresql: "testdb.test_schema.table1"
[...]
| table_list | Array | No | -
| The list of tables to be read, you can use this configuration instead
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path =
"testdb.table2", query = "select * id, name from testdb.table2"}]```
[...]
| where_condition | String | No | -
| Common row filter conditions for all tables/queries, must start with
`where`. for example `where id > 100`
[...]
diff --git a/docs/en/connector-v2/source/Oracle.md
b/docs/en/connector-v2/source/Oracle.md
index ed7c3272cc..d5efd28c5f 100644
--- a/docs/en/connector-v2/source/Oracle.md
+++ b/docs/en/connector-v2/source/Oracle.md
@@ -76,6 +76,7 @@ Read external data source data through JDBC.
| partition_num | Int | No | job parallelism | The
number of partition count, only support positive integer. default value is job
parallelism
|
| fetch_size | Int | No | 0 | For
queries that return a large number of objects,you can configure<br/> the row
fetch size used in the query toimprove performance by<br/> reducing the number
database hits required to satisfy the selection criteria.<br/> Zero means use
jdbc default value. |
| properties | Map | No | - |
Additional connection configuration parameters,when properties and URL have the
same parameters, the priority is determined by the <br/>specific implementation
of the driver. For example, in Oracle, properties take precedence over the URL.
|
+| use_regex | Boolean | No | false |
Control regular expression matching for table_path. When set to `true`, the
table_path will be treated as a regular expression pattern. When set to `false`
or not specified, the table_path will be treated as an exact path (no regex
matching). |
| table_path | String | No | -
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle:
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1"
<br/>postgresql: "testdb.test_schema.table1"
[...]
| table_list | Array | No | -
| The list of tables to be read, you can use this configuration instead
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path =
"testdb.table2", query = "select * id, name from testdb.table2"}]```
[...]
| where_condition | String | No | -
| Common row filter conditions for all tables/queries, must start with
`where`. for example `where id > 100`
[...]
diff --git a/docs/en/connector-v2/source/PostgreSQL.md
b/docs/en/connector-v2/source/PostgreSQL.md
index bff77f24e4..857540d3d4 100644
--- a/docs/en/connector-v2/source/PostgreSQL.md
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -90,6 +90,7 @@ Read external data source data through JDBC.
| partition_num | Int | No | job
parallelism | The number of partition count, only support positive integer.
default value is job parallelism
[...]
| fetch_size | Int | No | 0
| For queries that return a large number of objects,you can
configure<br/> the row fetch size used in the query toimprove performance
by<br/> reducing the number database hits required to satisfy the selection
criteria.<br/> Zero means use jdbc default value.
[...]
| properties | Map | No | -
| Additional connection configuration parameters,when properties and
URL have the same parameters, the priority is determined by the <br/>specific
implementation of the driver. For example, in MySQL, properties take precedence
over the URL.
[...]
+| use_regex | Boolean | No | false
| Control regular expression matching for table_path. When set to
`true`, the table_path will be treated as a regular expression pattern. When
set to `false` or not specified, the table_path will be treated as an exact
path (no regex matching).
[...]
| table_path | String | No | -
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle:
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1"
<br/>postgresql: "testdb.test_schema.table1"
[...]
| table_list | Array | No | -
| The list of tables to be read, you can use this configuration instead
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path =
"testdb.table2", query = "select * id, name from testdb.table2"}]```
[...]
| where_condition | String | No | -
| Common row filter conditions for all tables/queries, must start with
`where`. for example `where id > 100`
[...]
diff --git a/docs/en/connector-v2/source/SqlServer.md
b/docs/en/connector-v2/source/SqlServer.md
index ffe68e807a..b24f5c506c 100644
--- a/docs/en/connector-v2/source/SqlServer.md
+++ b/docs/en/connector-v2/source/SqlServer.md
@@ -83,6 +83,7 @@ Read external data source data through JDBC.
| partition_num | Int | No | job
parallelism | The number of partition count, only support positive integer.
default value is job parallelism
[...]
| fetch_size | Int | No | 0
| For queries that return a large number of objects,you can configure<br/>
the row fetch size used in the query toimprove performance by<br/> reducing the
number database hits required to satisfy the selection criteria.<br/> Zero
means use jdbc default value.
[...]
| properties | Map | No | -
| Additional connection configuration parameters,when properties and URL
have the same parameters, the priority is determined by the <br/>specific
implementation of the driver. For example, in MySQL, properties take precedence
over the URL.
[...]
+| use_regex | Boolean| No | false
| Control regular expression matching for table_path. When set to `true`,
the table_path will be treated as a regular expression pattern. When set to
`false` or not specified, the table_path will be treated as an exact path (no
regex matching).
[...]
| table_path | String | No | -
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle:
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1"
<br/>postgresql: "testdb.test_schema.table1"
[...]
| table_list | Array | No | -
| The list of tables to be read, you can use this configuration instead of
`table_path` example: ```[{ table_path = "testdb.table1"}, {table_path =
"testdb.table2", query = "select * id, name from testdb.table2"}]```
[...]
| where_condition | String | No | -
| Common row filter conditions for all tables/queries, must start with
`where`. for example `where id > 100`
[...]
diff --git a/docs/zh/connector-v2/source/Mysql.md
b/docs/zh/connector-v2/source/Mysql.md
index 763a41195a..bf5293a44e 100644
--- a/docs/zh/connector-v2/source/Mysql.md
+++ b/docs/zh/connector-v2/source/Mysql.md
@@ -83,6 +83,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
| partition_num | Int | 否 | 作业并行度 |
分区数量,仅支持正整数。<br/>默认值为作业并行度。
|
| fetch_size | Int | 否 | 0 |
对于返回大量对象的查询,可以配置查询的行提取大小,以通过减少满足选择条件所需的数据库访问次数来提高性能。<br/>设置为零表示使用 `JDBC` 的默认值。
|
| properties | Map | 否 | - |
额外的连接配置参数,当属性和 URL 中有相同的参数时,优先级由驱动程序的具体实现决定。<br/>例如,在 MySQL 中,属性优先于 URL。
|
+| use_regex | Boolean | 否 | false |
控制表路径的正则表达式匹配。当设置为true时,table_path 将被视为正则表达式模式。当设置为false或未指定时,table_path
将被视为精确路径(不进行正则匹配)。
|
| table_path | String | 否 | - |
表的完整路径,您可以使用此配置代替 `query`。<br/>示例:<br/>mysql: "testdb.table1"<br/>oracle:
"test_schema.table1"<br/>sqlserver: "testdb.test_schema.table1"<br/>postgresql:
"testdb.test_schema.table1" |
| table_list | Array | 否 | - |
要读取的表的列表,您可以使用此配置代替 `table_path`,示例如下: ```[{ table_path = "testdb.table1"},
{table_path = "testdb.table2", query = "select * id, name from
testdb.table2"}]``` |
| where_condition | String | 否 | - |
所有表/查询的通用行过滤条件,必须以 `where` 开头。例如 `where id > 100`。
|
diff --git a/docs/zh/connector-v2/source/PostgreSQL.md
b/docs/zh/connector-v2/source/PostgreSQL.md
index 591166706b..3e777a80f0 100644
--- a/docs/zh/connector-v2/source/PostgreSQL.md
+++ b/docs/zh/connector-v2/source/PostgreSQL.md
@@ -90,6 +90,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
| partition_num | Int | 否 | 作业并行性
| 分区数量,仅支持正整数。默认值为作业并行性
[...]
| fetch_size | Int | 否 | 0
| 对于返回大量对象的查询,您可以配置<br/> 用于查询的行抓取大小,以通过减少所需的数据库访问次数来提高性能。<br/> 0 表示使用
JDBC 默认值。
[...]
| properties | Map | 否 | -
| 其他连接配置参数,当属性和 URL 具有相同参数时,<br/> 优先级由驱动程序的具体实现决定。在 MySQL 中,属性优先于 URL。
[...]
+| use_regex | Boolean | 否 | false
| 控制表路径的正则表达式匹配。当设置为true时,table_path 将被视为正则表达式模式。当设置为false或未指定时,table_path
将被视为精确路径(不进行正则匹配)。
|
| table_path | String | 否 | -
| 表的完整路径,您可以使用此配置替代 `query`。<br/> 示例:<br/> mysql: "testdb.table1" <br/>
oracle: "test_schema.table1" <br/> sqlserver: "testdb.test_schema.table1" <br/>
postgresql: "testdb.test_schema.table1"
[...]
| table_list | Array | 否 | -
| 要读取的表列表,您可以使用此配置替代 `table_path` 示例:```[{ table_path =
"testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name
from testdb.table2"}]```
[...]
| where_condition | String | 否 | -
| 所有表/查询的通用行过滤条件,必须以 `where` 开头。 例如 `where id > 100`
[...]
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 629870c712..4423ce223d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Interface for reading and writing table metadata from SeaTunnel. Each
connector need to contain
@@ -164,21 +165,44 @@ public interface Catalog extends AutoCloseable {
Pattern databasePattern =
Pattern.compile(config.get(ConnectorCommonOptions.DATABASE_PATTERN));
Pattern tablePattern =
Pattern.compile(config.get(ConnectorCommonOptions.TABLE_PATTERN));
+
List<String> allDatabase = this.listDatabases();
allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
List<TablePath> tablePaths = new ArrayList<>();
+
for (String databaseName : allDatabase) {
- tableNames = this.listTables(databaseName);
- tableNames.forEach(
- tableName -> {
- if (tablePattern.matcher(databaseName + "." +
tableName).matches()) {
- tablePaths.add(TablePath.of(databaseName,
tableName));
- }
- });
+ List<TablePath> paths = this.listTablePaths(databaseName);
+ tablePaths.addAll(
+ paths.stream()
+ .filter(
+ path ->
+ tablePattern
+ .matcher(
+
path.getDatabaseName()
+ + "."
+ +
path.getSchemaAndTableName())
+ .matches())
+ .collect(Collectors.toList()));
}
return buildCatalogTablesWithErrorCheck(tablePaths.iterator());
}
+ default List<TablePath> listTablePaths(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ List<String> tableNames = listTables(databaseName);
+ return tableNames.stream()
+ .map(
+ tableName -> {
+ String[] parts = tableName.split("\\.");
+ if (parts.length > 1) {
+ return TablePath.of(databaseName, parts[0],
parts[1]);
+ } else {
+ return TablePath.of(databaseName, null,
tableName);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
default List<CatalogTable>
buildCatalogTablesWithErrorCheck(Iterator<TablePath> tablePaths) {
Map<String, Map<String, String>> unsupportedTable = new
LinkedHashMap<>();
List<CatalogTable> catalogTables = new ArrayList<>();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
index 6647d9c8eb..846f4b3c43 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
@@ -105,4 +105,10 @@ public interface JdbcSourceOptions {
.booleanType()
.defaultValue(false)
.withDescription("Skip the analysis of table count");
+
+ Option<Boolean> USE_REGEX =
+ Options.key("use_regex")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Use regular expression for table path
matching");
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
index 17c8d1790c..622808d510 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
@@ -63,6 +63,9 @@ public class JdbcSourceTableConfig implements Serializable {
@JsonProperty("skip_analyze")
private Boolean skipAnalyze;
+ @JsonProperty("use_regex")
+ private Boolean useRegex;
+
@Tolerate
public JdbcSourceTableConfig() {}
@@ -84,6 +87,7 @@ public class JdbcSourceTableConfig implements Serializable {
.partitionNumber(connectorConfig.get(JdbcOptions.PARTITION_NUM))
.partitionStart(connectorConfig.get(JdbcOptions.PARTITION_LOWER_BOUND))
.partitionEnd(connectorConfig.get(JdbcOptions.PARTITION_UPPER_BOUND))
+
.useRegex(connectorConfig.get(JdbcSourceOptions.USE_REGEX))
.build();
tableList = Collections.singletonList(tableProperty);
}
@@ -96,6 +100,9 @@ public class JdbcSourceTableConfig implements Serializable {
tableConfig.setUseSelectCount(
connectorConfig.get(JdbcSourceOptions.USE_SELECT_COUNT));
tableConfig.setSkipAnalyze(connectorConfig.get(JdbcSourceOptions.SKIP_ANALYZE));
+ if (tableConfig.getUseRegex() == null) {
+
tableConfig.setUseRegex(connectorConfig.get(JdbcSourceOptions.USE_REGEX));
+ }
});
if (tableList.size() > 1) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index b1d636cb97..e81166ffd9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
import org.apache.seatunnel.shade.com.google.common.base.Strings;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
@@ -63,6 +64,7 @@ import java.util.stream.Collectors;
@Slf4j
public class JdbcCatalogUtils {
private static final String DEFAULT_CATALOG_NAME = "jdbc_catalog";
+ private static final String DOT_PLACEHOLDER = "__$DOT$__";
public static Map<TablePath, JdbcSourceTable> getTables(
JdbcConnectionConfig jdbcConnectionConfig,
List<JdbcSourceTableConfig> tablesConfig)
@@ -83,24 +85,33 @@ public class JdbcCatalogUtils {
Map<String, Map<String, String>> unsupportedTable = new
LinkedHashMap<>();
for (JdbcSourceTableConfig tableConfig : tablesConfig) {
try {
- CatalogTable catalogTable =
- getCatalogTable(tableConfig, jdbcCatalog,
jdbcDialect);
- TablePath tablePath =
catalogTable.getTableId().toTablePath();
- JdbcSourceTable jdbcSourceTable =
- JdbcSourceTable.builder()
- .tablePath(tablePath)
- .query(tableConfig.getQuery())
-
.partitionColumn(tableConfig.getPartitionColumn())
-
.partitionNumber(tableConfig.getPartitionNumber())
-
.partitionStart(tableConfig.getPartitionStart())
-
.partitionEnd(tableConfig.getPartitionEnd())
-
.useSelectCount(tableConfig.getUseSelectCount())
-
.skipAnalyze(tableConfig.getSkipAnalyze())
- .catalogTable(catalogTable)
- .build();
- tables.put(tablePath, jdbcSourceTable);
- if (log.isDebugEnabled()) {
- log.debug("Loaded catalog table : {}, {}",
tablePath, jdbcSourceTable);
+ if (StringUtils.isNotEmpty(tableConfig.getTablePath())
+ && StringUtils.isEmpty(tableConfig.getQuery())
+ && tableConfig.getUseRegex()) {
+ processRegexTablePath(jdbcCatalog, jdbcDialect,
tableConfig, tables);
+ } else {
+ CatalogTable catalogTable =
+ getCatalogTable(tableConfig, jdbcCatalog,
jdbcDialect);
+ TablePath tablePath =
catalogTable.getTableId().toTablePath();
+ JdbcSourceTable jdbcSourceTable =
+ JdbcSourceTable.builder()
+ .tablePath(tablePath)
+ .query(tableConfig.getQuery())
+
.partitionColumn(tableConfig.getPartitionColumn())
+
.partitionNumber(tableConfig.getPartitionNumber())
+
.partitionStart(tableConfig.getPartitionStart())
+
.partitionEnd(tableConfig.getPartitionEnd())
+
.useSelectCount(tableConfig.getUseSelectCount())
+
.skipAnalyze(tableConfig.getSkipAnalyze())
+ .catalogTable(catalogTable)
+ .build();
+ tables.put(tablePath, jdbcSourceTable);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Loaded catalog table : {}, {}",
+ tablePath,
+ jdbcSourceTable);
+ }
}
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode()
@@ -409,4 +420,88 @@ public class JdbcCatalogUtils {
catalogConfig.put(JdbcOptions.HANDLE_BLOB_AS_STRING.key(),
config.isHandleBlobAsString());
return ReadonlyConfig.fromMap(catalogConfig);
}
+
+ private static void processRegexTablePath(
+ AbstractJdbcCatalog jdbcCatalog,
+ JdbcDialect jdbcDialect,
+ JdbcSourceTableConfig tableConfig,
+ Map<TablePath, JdbcSourceTable> result)
+ throws SQLException {
+
+ String tablePath = tableConfig.getTablePath();
+ log.info("Processing table path with regex: {}", tablePath);
+
+ String processedTablePath = tablePath.replace("\\.", DOT_PLACEHOLDER);
+ log.debug("After replacing escaped dots with placeholder: {}",
processedTablePath);
+
+ TablePath parsedPath = jdbcDialect.parse(processedTablePath);
+
+ String databasePattern = parsedPath.getDatabaseName();
+ String schemaPattern = parsedPath.getSchemaName();
+ String tableNamePattern = parsedPath.getTableName();
+
+ if (StringUtils.isEmpty(databasePattern)) {
+ databasePattern = ".*";
+ }
+
+ String fullTablePattern;
+ if (StringUtils.isNotEmpty(schemaPattern)) {
+ fullTablePattern =
+ String.format(
+ "%s.%s.%s",
+ databasePattern.replace(DOT_PLACEHOLDER, "."),
+ schemaPattern.replace(DOT_PLACEHOLDER, "."),
+ tableNamePattern.replace(DOT_PLACEHOLDER, "."));
+ } else {
+ fullTablePattern =
+ String.format(
+ "%s.%s",
+ databasePattern.replace(DOT_PLACEHOLDER, "."),
+ tableNamePattern.replace(DOT_PLACEHOLDER, "."));
+ }
+
+ log.info(
+ "Parsed patterns - database: {}, full table pattern: {}",
+ databasePattern,
+ fullTablePattern);
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(ConnectorCommonOptions.DATABASE_PATTERN.key(),
databasePattern);
+ configMap.put(ConnectorCommonOptions.TABLE_PATTERN.key(),
fullTablePattern);
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ try {
+ List<CatalogTable> catalogTables = jdbcCatalog.getTables(config);
+
+ if (catalogTables.isEmpty()) {
+ log.warn("No tables found matching regex pattern: {}",
tablePath);
+ return;
+ }
+
+ for (CatalogTable catalogTable : catalogTables) {
+ TablePath path = catalogTable.getTableId().toTablePath();
+
+ JdbcSourceTable jdbcSourceTable =
+ JdbcSourceTable.builder()
+ .tablePath(path)
+
.partitionColumn(tableConfig.getPartitionColumn())
+
.partitionNumber(tableConfig.getPartitionNumber())
+
.partitionStart(tableConfig.getPartitionStart())
+ .partitionEnd(tableConfig.getPartitionEnd())
+
.useSelectCount(tableConfig.getUseSelectCount())
+ .skipAnalyze(tableConfig.getSkipAnalyze())
+ .catalogTable(catalogTable)
+ .build();
+
+ result.put(path, jdbcSourceTable);
+ log.info("Found table matching regex pattern: {}", path);
+ }
+
+ log.info("Found {} tables matching regex pattern: {}",
catalogTables.size(), tablePath);
+ } catch (Exception e) {
+ log.warn("Error processing table path with regex: {}", tablePath,
e);
+ throw new SQLException("Failed to process regex table path: " +
tablePath, e);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
index 872dc26f8f..223a5b345d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
@@ -17,25 +17,44 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
public class JdbcCatalogUtilsTest {
private static final CatalogTable DEFAULT_TABLE =
CatalogTable.of(
@@ -370,4 +389,394 @@ public class JdbcCatalogUtilsTest {
tableOfPath.getTableSchema().getColumns().get(0),
mergeTable.getTableSchema().getColumns().get(0));
}
+
+ @Test
+ public void testCatalogGetTablesWithMysqlPattern() throws Exception {
+ TestCatalog testCatalog = spy(new TestCatalog());
+
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .column(PhysicalColumn.of("id", BasicType.INT_TYPE, 0,
true, null, null))
+ .build();
+
+ List<String> allDatabases = new ArrayList<>(Arrays.asList("test",
"prod", "dev"));
+
+ Map<String, List<String>> databaseTables = new HashMap<>();
+ databaseTables.put(
+ "test", Arrays.asList("table1", "table2", "table3",
"table123", "tableabc"));
+ databaseTables.put("prod", Arrays.asList("prod_table1", "prod_table2",
"prod_table3"));
+ databaseTables.put("dev", Arrays.asList("dev_table1", "dev_table2"));
+
+ Map<TablePath, CatalogTable> tableMap = new HashMap<>();
+ for (String database : allDatabases) {
+ for (String tableName : databaseTables.get(database)) {
+ TablePath tablePath = TablePath.of(database, null, tableName);
+ CatalogTable table =
+ CatalogTable.of(
+ TableIdentifier.of(database, null, null,
tableName),
+ tableSchema,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "Test " + tableName);
+ tableMap.put(tablePath, table);
+ }
+ }
+
+ doAnswer(invocation -> new
ArrayList<>(allDatabases)).when(testCatalog).listDatabases();
+
+ for (String database : allDatabases) {
+ doReturn(true).when(testCatalog).databaseExists(eq(database));
+ }
+
+ for (String database : allDatabases) {
+ doReturn(new ArrayList<>(databaseTables.get(database)))
+ .when(testCatalog)
+ .listTables(eq(database));
+ }
+
+ for (String database : allDatabases) {
+ List<TablePath> paths =
+ databaseTables.get(database).stream()
+ .map(tableName -> TablePath.of(database, null,
tableName))
+ .collect(Collectors.toList());
+ doReturn(paths).when(testCatalog).listTablePaths(eq(database));
+ }
+
+ doReturn(true).when(testCatalog).tableExists(any(TablePath.class));
+
+ doAnswer(
+ invocation -> {
+ TablePath path = invocation.getArgument(0);
+ CatalogTable table = tableMap.get(path);
+ if (table == null) {
+ throw new TableNotExistException("test", path);
+ }
+ return table;
+ })
+ .when(testCatalog)
+ .getTable(any(TablePath.class));
+
+ testMysqlRegexPattern(
+ testCatalog,
+ "test",
+ "test.table\\d+",
+ Arrays.asList("table1", "table2", "table3", "table123"));
+
+ testMysqlRegexPattern(
+ testCatalog,
+ ".*",
+ ".*table1",
+ Arrays.asList("table1", "prod_table1", "dev_table1"));
+
+ testMysqlRegexPattern(
+ testCatalog,
+ "prod",
+ "prod.prod_table[1-2]",
+ Arrays.asList("prod_table1", "prod_table2"));
+
+ testMysqlRegexPattern(testCatalog, ".*", "nonexistent.*",
Collections.emptyList());
+ }
+
+ private void testMysqlRegexPattern(
+ Catalog catalog,
+ String databasePattern,
+ String tablePattern,
+ List<String> expectedTablePaths) {
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(ConnectorCommonOptions.DATABASE_PATTERN.key(),
databasePattern);
+ configMap.put(ConnectorCommonOptions.TABLE_PATTERN.key(),
tablePattern);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ List<CatalogTable> tables = catalog.getTables(config);
+
+ List<String> actualTablePaths =
+ tables.stream()
+ .map(t -> t.getTableId().toTablePath().toString())
+ .collect(Collectors.toList());
+
+ Set<String> actualTablePathSet = new HashSet<>(actualTablePaths);
+ Set<String> expectedTablePathSet = new HashSet<>(expectedTablePaths);
+
+ Assertions.assertEquals(
+ expectedTablePathSet.size(),
+ actualTablePathSet.size(),
+ "Expected "
+ + expectedTablePathSet.size()
+ + " tables for pattern: "
+ + databasePattern
+ + "."
+ + tablePattern);
+
+ if (!expectedTablePaths.isEmpty()) {
+ for (String expectedTablePath : expectedTablePaths) {
+ Assertions.assertTrue(
+ actualTablePathSet.contains(expectedTablePath),
+ "Expected table path "
+ + expectedTablePath
+ + " not found for pattern: "
+ + databasePattern
+ + "."
+ + tablePattern);
+ }
+ } else {
+ Assertions.assertTrue(
+ actualTablePathSet.isEmpty(),
+ "Expected empty result for pattern: " + databasePattern +
"." + tablePattern);
+ }
+ }
+
+ @Test
+ public void testCatalogGetTablesWithPostgresPattern() throws Exception {
+ String catalogName = "postgres_catalog";
+ TestCatalog postgresCatalog = spy(new TestCatalog());
+
+ doReturn(catalogName).when(postgresCatalog).name();
+
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .column(PhysicalColumn.of("id", BasicType.INT_TYPE, 0,
true, null, null))
+ .build();
+
+ List<String> allDatabases = new ArrayList<>(Arrays.asList("postgres",
"test_db", "dev_db"));
+
+ Map<String, List<String>> databaseSchemas = new HashMap<>();
+ databaseSchemas.put("postgres", Arrays.asList("public", "schema1",
"schema2"));
+ databaseSchemas.put("test_db", Arrays.asList("public", "test_schema"));
+ databaseSchemas.put("dev_db", Arrays.asList("public", "dev_schema"));
+
+ Map<String, Map<String, List<String>>> schemasTables = new HashMap<>();
+
+ Map<String, List<String>> postgresSchemas = new HashMap<>();
+ postgresSchemas.put("public", Arrays.asList("users", "orders",
"products", "customers"));
+ postgresSchemas.put("schema1", Arrays.asList("table1", "table2",
"table3"));
+ postgresSchemas.put("schema2", Arrays.asList("log_2021", "log_2022",
"log_2023"));
+ schemasTables.put("postgres", postgresSchemas);
+
+ Map<String, List<String>> testDbSchemas = new HashMap<>();
+ testDbSchemas.put("public", Arrays.asList("test_table1",
"test_table2"));
+ testDbSchemas.put("test_schema", Arrays.asList("data_table1",
"data_table2"));
+ schemasTables.put("test_db", testDbSchemas);
+
+ Map<String, List<String>> devDbSchemas = new HashMap<>();
+ devDbSchemas.put("public", Arrays.asList("dev_table1", "dev_table2"));
+ devDbSchemas.put("dev_schema", Arrays.asList("temp_table1",
"temp_table2"));
+ schemasTables.put("dev_db", devDbSchemas);
+
+ Map<TablePath, CatalogTable> tableMap = new HashMap<>();
+ for (String database : allDatabases) {
+ for (String schema : databaseSchemas.get(database)) {
+ for (String tableName :
schemasTables.get(database).get(schema)) {
+ TablePath tablePath = TablePath.of(database, schema,
tableName);
+ CatalogTable table =
+ CatalogTable.of(
+ TableIdentifier.of(catalogName, database,
schema, tableName),
+ tableSchema,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "Test " + tableName);
+ tableMap.put(tablePath, table);
+ }
+ }
+ }
+
+ doAnswer(invocation -> new
ArrayList<>(allDatabases)).when(postgresCatalog).listDatabases();
+
+ for (String database : allDatabases) {
+ doReturn(true).when(postgresCatalog).databaseExists(eq(database));
+ }
+
+ for (String database : allDatabases) {
+ for (String schema : databaseSchemas.get(database)) {
+ List<String> tables = schemasTables.get(database).get(schema);
+ doReturn(new ArrayList<>(tables))
+ .when(postgresCatalog)
+ .listTables(eq(database + "." + schema));
+ }
+ }
+
+ for (String database : allDatabases) {
+ List<TablePath> paths = new ArrayList<>();
+ for (String schema : databaseSchemas.get(database)) {
+ for (String tableName :
schemasTables.get(database).get(schema)) {
+ paths.add(TablePath.of(database, schema, tableName));
+ }
+ }
+ doReturn(paths).when(postgresCatalog).listTablePaths(eq(database));
+ }
+
+ doReturn(true).when(postgresCatalog).tableExists(any(TablePath.class));
+
+ doAnswer(
+ invocation -> {
+ TablePath path = invocation.getArgument(0);
+ CatalogTable table = tableMap.get(path);
+ if (table == null) {
+ throw new TableNotExistException("test", path);
+ }
+ return table;
+ })
+ .when(postgresCatalog)
+ .getTable(any(TablePath.class));
+
+ testPostgresRegexPattern(
+ postgresCatalog,
+ "postgres",
+ "postgres\\.public\\..*",
+ Arrays.asList(
+ "postgres.public.users",
+ "postgres.public.orders",
+ "postgres.public.products",
+ "postgres.public.customers"));
+
+ testPostgresRegexPattern(
+ postgresCatalog,
+ ".*",
+ ".*\\.public\\..*table.*",
+ Arrays.asList(
+ "test_db.public.test_table1",
+ "test_db.public.test_table2",
+ "dev_db.public.dev_table1",
+ "dev_db.public.dev_table2"));
+
+ testPostgresRegexPattern(
+ postgresCatalog,
+ ".*",
+ ".*\\..*\\.log_\\d{4}",
+ Arrays.asList(
+ "postgres.schema2.log_2021",
+ "postgres.schema2.log_2022",
+ "postgres.schema2.log_2023"));
+
+ testPostgresRegexPattern(
+ postgresCatalog,
+ "test_db",
+ "test_db\\..*\\..*",
+ Arrays.asList(
+ "test_db.public.test_table1",
+ "test_db.public.test_table2",
+ "test_db.test_schema.data_table1",
+ "test_db.test_schema.data_table2"));
+
+ testPostgresRegexPattern(
+ postgresCatalog, ".*", ".*\\..*\\.nonexistent.*",
Collections.emptyList());
+ }
+
+ private void testPostgresRegexPattern(
+ Catalog catalog,
+ String databasePattern,
+ String tablePattern,
+ List<String> expectedTablePaths) {
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(ConnectorCommonOptions.DATABASE_PATTERN.key(),
databasePattern);
+ configMap.put(ConnectorCommonOptions.TABLE_PATTERN.key(),
tablePattern);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ List<CatalogTable> tables = catalog.getTables(config);
+
+ List<String> actualTablePaths =
+ tables.stream()
+ .map(
+ t -> {
+ TableIdentifier id = t.getTableId();
+ return id.getDatabaseName()
+ + "."
+ + id.getSchemaName()
+ + "."
+ + id.getTableName();
+ })
+ .collect(Collectors.toList());
+
+ Set<String> actualTablePathSet = new HashSet<>(actualTablePaths);
+ Set<String> expectedTablePathSet = new HashSet<>(expectedTablePaths);
+
+ Assertions.assertEquals(
+ expectedTablePathSet.size(),
+ actualTablePathSet.size(),
+ "Expected "
+ + expectedTablePathSet.size()
+ + " tables for pattern: "
+ + databasePattern
+ + "."
+ + tablePattern);
+
+ if (!expectedTablePaths.isEmpty()) {
+ for (String expectedTablePath : expectedTablePaths) {
+ Assertions.assertTrue(
+ actualTablePathSet.contains(expectedTablePath),
+ "Expected table path "
+ + expectedTablePath
+ + " not found for pattern: "
+ + databasePattern
+ + "."
+ + tablePattern);
+ }
+ } else {
+ Assertions.assertTrue(
+ actualTablePathSet.isEmpty(),
+ "Expected empty result for pattern: " + databasePattern +
"." + tablePattern);
+ }
+ }
+
+ private static class TestCatalog implements Catalog {
+
+ @Override
+ public void open() throws CatalogException {}
+
+ @Override
+ public void close() throws CatalogException {}
+
+ @Override
+ public String name() {
+ return "TestCatalog";
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return "test";
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ throw new TableNotExistException("test", tablePath);
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {}
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {}
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {}
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean
ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {}
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
index c3349c6ccd..72c37c8b8e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
@@ -204,6 +204,36 @@ public class JdbcMysqlMultipleTablesIT extends
TestSuiteBase implements TestReso
0, sqlConfEexecResult.getExitCode(),
sqlConfEexecResult.getStderr());
}
+ @TestTemplate
+ public void testMysqlJdbcRegexPatternE2e(TestContainer container)
+ throws IOException, InterruptedException, SQLException {
+ clearSinkTables();
+
+ Container.ExecResult execResult =
+
container.executeJob("/jdbc_mysql_source_and_sink_with_pattern_tables.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ List<Executable> asserts =
+ TABLES.stream()
+ .map(
+ (Function<String, Executable>)
+ table ->
+ () ->
+
Assertions.assertIterableEquals(
+ query(
+
String.format(
+
"SELECT * FROM %s.%s",
+
SOURCE_DATABASE,
+
table)),
+ query(
+
String.format(
+
"SELECT * FROM %s.%s",
+
SINK_DATABASE,
+
table))))
+ .collect(Collectors.toList());
+ Assertions.assertAll(asserts);
+ }
+
@AfterAll
@Override
public void tearDown() throws Exception {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
index 8ba6b1e851..35433f8c57 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
@@ -177,6 +177,35 @@ public class JdbcOracleMultipleTablesIT extends
TestSuiteBase implements TestRes
Assertions.assertAll(asserts);
}
+ @TestTemplate
+ public void testOracleJdbcRegexPatternE2e(TestContainer container)
+ throws IOException, InterruptedException, SQLException {
+ clearSinkTables();
+
+ Container.ExecResult execResult =
+
container.executeJob("/jdbc_oracle_source_with_pattern_tables_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ List<Executable> asserts =
+ TABLES.stream()
+ .map(
+ (Function<String, Executable>)
+ table ->
+ () ->
+
Assertions.assertIterableEquals(
+ query(
+
String.format(
+
"SELECT * FROM %s.%s order by INTEGER_COL asc",
+
SCHEMA, table)),
+ query(
+
String.format(
+
"SELECT * FROM %s.%s order by INTEGER_COL asc",
+
SCHEMA,
+
"SINK_" + table))))
+ .collect(Collectors.toList());
+ Assertions.assertAll(asserts);
+ }
+
@AfterAll
@Override
public void tearDown() throws Exception {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_pattern_tables.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_pattern_tables.conf
new file mode 100644
index 0000000000..cf227e33a6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_pattern_tables.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
+ driver = "com.mysql.cj.jdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "Abc!@#135_seatunnel"
+
+ table_list = [
+ {
+ table_path = "source.table\\d+"
+ use_regex = true
+ }
+ ]
+ where_condition = "where c_int >= 0"
+ split.size = 8096
+ split.even-distribution.factor.upper-bound = 100
+ split.even-distribution.factor.lower-bound = 0.05
+ split.sample-sharding.threshold = 1000
+ split.inverse-sampling.rate = 1000
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "Abc!@#135_seatunnel"
+
+ database = "sink"
+ table = "${table_name}"
+ generate_sink_sql = true
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_with_pattern_tables_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_with_pattern_tables_to_sink.conf
new file mode 100644
index 0000000000..08d42b093e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_with_pattern_tables_to_sink.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Jdbc {
+ driver = oracle.jdbc.driver.OracleDriver
+ url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+ user = testUser
+ password = testPassword
+ use_select_count = true
+ table_list = [
+ {
+ table_path = "TESTUSER.TABLE\\d+"
+ use_regex = true
+ }
+ ]
+ properties {
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+sink {
+ Jdbc {
+ driver = oracle.jdbc.driver.OracleDriver
+ url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+ user = testUser
+ password = testPassword
+ database = XE
+ table = "TESTUSER.SINK_${table_name}"
+ generate_sink_sql = true
+ properties {
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+ }
+
+}