This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 670a52a918 [Feature][Jdbc] Support read multiple tables by regular 
expressions (#9380)
670a52a918 is described below

commit 670a52a918ce6fb4baa73006856eb1b085e2ff97
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jun 27 17:23:43 2025 +0800

    [Feature][Jdbc] Support read multiple tables by regular expressions (#9380)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connector-v2/source/Jdbc.md                |  97 ++++-
 docs/en/connector-v2/source/Mysql.md               |   1 +
 docs/en/connector-v2/source/Oracle.md              |   1 +
 docs/en/connector-v2/source/PostgreSQL.md          |   1 +
 docs/en/connector-v2/source/SqlServer.md           |   1 +
 docs/zh/connector-v2/source/Mysql.md               |   1 +
 docs/zh/connector-v2/source/PostgreSQL.md          |   1 +
 .../seatunnel/api/table/catalog/Catalog.java       |  38 +-
 .../seatunnel/jdbc/config/JdbcSourceOptions.java   |   6 +
 .../jdbc/config/JdbcSourceTableConfig.java         |   7 +
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     | 131 ++++++-
 .../seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java | 409 +++++++++++++++++++++
 .../seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java  |  30 ++
 .../seatunnel/jdbc/JdbcOracleMultipleTablesIT.java |  29 ++
 ..._mysql_source_and_sink_with_pattern_tables.conf |  57 +++
 ..._oracle_source_with_pattern_tables_to_sink.conf |  63 ++++
 16 files changed, 847 insertions(+), 26 deletions(-)

diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 0b62ac0fef..0e7bd4a791 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -60,6 +60,7 @@ supports query SQL and can achieve projection effect.
 | handle_blob_as_string                      | Boolean | No       | false      
     | If true, BLOB type will be converted to STRING type. **Only supported 
for Oracle database**. This is useful for handling large BLOB fields in Oracle 
that exceed the default size limit. When transmitting Oracle's BLOB fields to 
systems like Doris, setting this to true can make the data transfer more 
efficient.                                                                      
                           [...]
 | use_select_count                           | Boolean | No       | false      
     | Use select count for table count rather then other methods in dynamic 
chunk split stage. This is currently only available for jdbc-oracle.In this 
scenario, select count directly is used when it is faster to update statistics 
using sql from analysis table                                                   
                                                                                
                      [...]
 | skip_analyze                               | Boolean | No       | false      
     | Skip the analysis of table count in dynamic chunk split stage. This is 
currently only available for jdbc-oracle.In this scenario, you schedule 
analysis table sql to update related table statistics periodically or your 
table data does not change frequently                                           
                                                                                
                             [...]
+| use_regex                                  | Boolean | No       | false      
     | Control regular expression matching for table_path. When set to `true`, 
the table_path will be treated as a regular expression pattern. When set to 
`false` or not specified, the table_path will be treated as an exact path (no 
regex matching). |
 | fetch_size                                 | Int     | No       | 0          
     | For queries that return a large number of objects, you can configure the 
row fetch size used in the query to improve performance by reducing the number 
database hits required to satisfy the selection criteria. Zero means use jdbc 
default value.                                                                  
                                                                                
                 [...]
 | properties                                 | Map     | No       | -          
     | Additional connection configuration parameters,when properties and URL 
have the same parameters, the priority is determined by the <br/>specific 
implementation of the driver. For example, in MySQL, properties take precedence 
over the URL.                                                                   
                                                                                
                      [...]
 | table_path                                 | String  | No       | -          
     | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>`- mysql: "testdb.table1" `<br/>`- 
oracle: "test_schema.table1" `<br/>`- sqlserver: "testdb.test_schema.table1"` 
<br/>`- postgresql: "testdb.test_schema.table1"`  <br/>`- iris: 
"test_schema.table1"`                                                           
                                           [...]
@@ -74,6 +75,95 @@ supports query SQL and can achieve projection effect.
 | split.string_split_mode                    | String  | No       | sample     
     | Supports different string splitting algorithms. By default, `sample` is 
used to determine the split by sampling the string value. You can switch to 
`charset_based` to enable charset-based string splitting algorithm. When set to 
`charset_based`, the algorithm assumes characters of partition_column are 
within ASCII range 32-126, which covers most character-based splitting 
scenarios.                        [...]
 | split.string_split_mode_collate            | String  | No       | -          
     | Specifies the collation to use when string_split_mode is set to 
`charset_based` and the table has a special collation. If not specified, the 
database's default collation will be used.                                      
                                                                                
                                                                                
                          [...]
 
+### Table Matching
+
+The JDBC Source connector supports two ways to specify tables:
+
+1. **Exact Table Path**: Use `table_path` to specify a single table with its 
full path.
+   ```hocon
+   table_path = "testdb.table1"
+   ```
+
+2. **Regular Expression**: Use `table_path` with a regex pattern to match 
multiple tables.
+   ```hocon
+   table_path = "testdb.table\\d+"  # Matches table1, table2, table3, etc.
+   use_regex = true
+   ```
+
+#### Regular Expression Support for Table Names
+
+The JDBC connector supports using regular expressions to match multiple 
tables. This feature allows you to process multiple tables with a single source 
configuration.
+
+#### Configuration
+
+To use regular expression matching for table paths:
+
+1. Set `use_regex = true` to enable regex matching
+2. If `use_regex` is not set or set to `false`, the connector will treat the 
table_path as an exact path (no regex matching)
+
+#### Regular Expression Syntax Notes
+
+- **Path Separator**: The dot (`.`) is treated as a separator between 
database, schema, and table names.
+- **Escaped Dots**: If you need to use a dot (`.`) as a wildcard character in 
your regular expression to match any character, you must escape it with a 
backslash (`\.`).
+- **Path Format**: For paths like `database.table` or `database.schema.table`, 
the last unescaped dot separates the table pattern from the database/schema 
pattern.
+- **Pattern Examples**:
+  - `test.table\\d+` - Matches tables like `table1`, `table2`, etc. in the 
`test` database
+  - `test.*` - Matches all tables in the `test` database (for whole database 
synchronization)
+  - `postgres.public.test_db_\.*` - Matches all tables that start with 
`test_db_` in the `public` schema of the `postgres` database
+
+#### Example
+
+```hocon
+source {
+  Jdbc {
+    url = "jdbc:mysql://localhost:3306/test"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "root"
+    password = "password"
+    
+    table_list = [
+      {
+        # Regex matching - match any table in test database
+        table_path = "test.*"
+        use_regex = true
+      },
+      {
+        # Regex matching - match tables with "user" followed by digits
+        table_path = "test.user\\d+"
+        use_regex = true
+      },
+      {
+        # Exact matching - simple table name
+        table_path = "test.config"
+        # use_regex not specified, defaults to false
+      },
+    ]
+  }
+}
+```
+
+#### Multi-table Synchronization
+
+When using either regular expressions, the connector will read data from all 
matching tables. Each table will be processed independently, and the data will 
be combined in the output.
+
+Example configuration for multi-table synchronization:
+```hocon
+Jdbc {
+    url = "jdbc:mysql://localhost/test"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "root"
+    password = "123456"
+
+    # Using regular expression with explicit configuration
+    table_list = [
+      {
+        table_path = "testdb.table\\d+"
+        use_regex = true
+      }
+    ]
+}
+```
+
 ### decimal_type_narrowing
 
 Decimal type narrowing, if true, the decimal type will be narrowed to the int 
or long type if without loss of precision. Only support for Oracle at now.
@@ -354,8 +444,13 @@ Jdbc {
         },
         {
           table_path = "testdb.table2"
-          # Use query filetr rows & columns
+          # Use query filter rows & columns
           query = "select id, name from testdb.table2 where id > 100"
+        },
+        {
+          # Using regex to match multiple tables
+          table_path = "testdb.user_table\\d+"
+          use_regex = true
         }
     ]
     #where_condition= "where id > 100"
diff --git a/docs/en/connector-v2/source/Mysql.md 
b/docs/en/connector-v2/source/Mysql.md
index 85519584d6..9a84df8652 100644
--- a/docs/en/connector-v2/source/Mysql.md
+++ b/docs/en/connector-v2/source/Mysql.md
@@ -83,6 +83,7 @@ Read external data source data through JDBC.
 | partition_num                              | Int        | No       | job 
parallelism | The number of partition count, only support positive integer. 
default value is job parallelism                                                
                                                                                
                                                                                
                                                                                
                      [...]
 | fetch_size                                 | Int        | No       | 0       
        | For queries that return a large number of objects,you can 
configure<br/> the row fetch size used in the query toimprove performance 
by<br/> reducing the number database hits required to satisfy the selection 
criteria.<br/> Zero means use jdbc default value.                               
                                                                                
                                    [...]
 | properties                                 | Map        | No       | -       
        | Additional connection configuration parameters,when properties and 
URL have the same parameters, the priority is determined by the <br/>specific 
implementation of the driver. For example, in MySQL, properties take precedence 
over the URL.                                                                   
                                                                                
                   [...]
+| use_regex                                  | Boolean    | No       | false   
        | Control regular expression matching for table_path. When set to 
`true`, the table_path will be treated as a regular expression pattern. When 
set to `false` or not specified, the table_path will be treated as an exact 
path (no regex matching).                                                       
                                                                                
                           [...]
 | table_path                                 | String     | No       | -       
        | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: 
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" 
<br/>postgresql: "testdb.test_schema.table1"                                    
                                                                                
                                    [...]
 | table_list                                 | Array      | No       | -       
        | The list of tables to be read, you can use this configuration instead 
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                  [...]
 | where_condition                            | String     | No       | -       
        | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
               [...]
diff --git a/docs/en/connector-v2/source/Oracle.md 
b/docs/en/connector-v2/source/Oracle.md
index ed7c3272cc..d5efd28c5f 100644
--- a/docs/en/connector-v2/source/Oracle.md
+++ b/docs/en/connector-v2/source/Oracle.md
@@ -76,6 +76,7 @@ Read external data source data through JDBC.
 | partition_num                | Int        | No       | job parallelism | The 
number of partition count, only support positive integer. default value is job 
parallelism                                                                     
                                                                                
               |
 | fetch_size                   | Int        | No       | 0               | For 
queries that return a large number of objects,you can configure<br/> the row 
fetch size used in the query toimprove performance by<br/> reducing the number 
database hits required to satisfy the selection criteria.<br/> Zero means use 
jdbc default value. |
 | properties                   | Map        | No       | -               | 
Additional connection configuration parameters,when properties and URL have the 
same parameters, the priority is determined by the <br/>specific implementation 
of the driver. For example, in Oracle, properties take precedence over the URL. 
                   |
+| use_regex                    | Boolean    | No       | false           | 
Control regular expression matching for table_path. When set to `true`, the 
table_path will be treated as a regular expression pattern. When set to `false` 
or not specified, the table_path will be treated as an exact path (no regex 
matching).                 |
 | table_path                                 | String     | No       | -       
        | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: 
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" 
<br/>postgresql: "testdb.test_schema.table1"                                    
                                                                                
                                    [...]
 | table_list                                 | Array      | No       | -       
        | The list of tables to be read, you can use this configuration instead 
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                  [...]
 | where_condition                            | String     | No       | -       
        | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
               [...]
diff --git a/docs/en/connector-v2/source/PostgreSQL.md 
b/docs/en/connector-v2/source/PostgreSQL.md
index bff77f24e4..857540d3d4 100644
--- a/docs/en/connector-v2/source/PostgreSQL.md
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -90,6 +90,7 @@ Read external data source data through JDBC.
 | partition_num                              | Int        | No       | job 
parallelism | The number of partition count, only support positive integer. 
default value is job parallelism                                                
                                                                                
                                                                                
                                                                                
                      [...]
 | fetch_size                                 | Int        | No       | 0       
        | For queries that return a large number of objects,you can 
configure<br/> the row fetch size used in the query toimprove performance 
by<br/> reducing the number database hits required to satisfy the selection 
criteria.<br/> Zero means use jdbc default value.                               
                                                                                
                                    [...]
 | properties                                 | Map        | No       | -       
        | Additional connection configuration parameters,when properties and 
URL have the same parameters, the priority is determined by the <br/>specific 
implementation of the driver. For example, in MySQL, properties take precedence 
over the URL.                                                                   
                                                                                
                   [...]
+| use_regex                                  | Boolean    | No       | false   
        | Control regular expression matching for table_path. When set to 
`true`, the table_path will be treated as a regular expression pattern. When 
set to `false` or not specified, the table_path will be treated as an exact 
path (no regex matching).                                                       
                                                                                
                           [...]
 | table_path                                 | String     | No       | -       
        | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: 
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" 
<br/>postgresql: "testdb.test_schema.table1"                                    
                                                                                
                                    [...]
 | table_list                                 | Array      | No       | -       
        | The list of tables to be read, you can use this configuration instead 
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                  [...]
 | where_condition                            | String     | No       | -       
        | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
               [...]
diff --git a/docs/en/connector-v2/source/SqlServer.md 
b/docs/en/connector-v2/source/SqlServer.md
index ffe68e807a..b24f5c506c 100644
--- a/docs/en/connector-v2/source/SqlServer.md
+++ b/docs/en/connector-v2/source/SqlServer.md
@@ -83,6 +83,7 @@ Read external data source data through JDBC.
 | partition_num                              | Int    | No       | job 
parallelism | The number of partition count, only support positive integer. 
default value is job parallelism                                                
                                                                                
                                                                                
                                                                                
                          [...]
 | fetch_size                                 | Int    | No       | 0           
    | For queries that return a large number of objects,you can configure<br/> 
the row fetch size used in the query toimprove performance by<br/> reducing the 
number database hits required to satisfy the selection criteria.<br/> Zero 
means use jdbc default value.                                                   
                                                                                
                    [...]
 | properties                                 | Map    | No       | -           
    | Additional connection configuration parameters,when properties and URL 
have the same parameters, the priority is determined by the <br/>specific 
implementation of the driver. For example, in MySQL, properties take precedence 
over the URL.                                                                   
                                                                                
                       [...]
+| use_regex                                  | Boolean| No       | false       
    | Control regular expression matching for table_path. When set to `true`, 
the table_path will be treated as a regular expression pattern. When set to 
`false` or not specified, the table_path will be treated as an exact path (no 
regex matching).                                                                
                                                                                
                      [...]
 | table_path                                 | String | No       | -           
    | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: 
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" 
<br/>postgresql: "testdb.test_schema.table1"                                    
                                                                                
                                        [...]
 | table_list                                 | Array  | No       | -           
    | The list of tables to be read, you can use this configuration instead of 
`table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                      [...]
 | where_condition                            | String | No       | -           
    | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
                   [...]
diff --git a/docs/zh/connector-v2/source/Mysql.md 
b/docs/zh/connector-v2/source/Mysql.md
index 763a41195a..bf5293a44e 100644
--- a/docs/zh/connector-v2/source/Mysql.md
+++ b/docs/zh/connector-v2/source/Mysql.md
@@ -83,6 +83,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 | partition_num                              | Int        | 否    | 作业并行度 | 
分区数量,仅支持正整数。<br/>默认值为作业并行度。                                                     
                                                                                
                                                       |
 | fetch_size                                 | Int        | 否    | 0     | 
对于返回大量对象的查询,可以配置查询的行提取大小,以通过减少满足选择条件所需的数据库访问次数来提高性能。<br/>设置为零表示使用 `JDBC` 的默认值。  
                                                                                
                                                       |
 | properties                                 | Map        | 否    | -     | 
额外的连接配置参数,当属性和 URL 中有相同的参数时,优先级由驱动程序的具体实现决定。<br/>例如,在 MySQL 中,属性优先于 URL。        
                                                                                
                                                       |
+| use_regex                                  | Boolean    | 否    | false | 
控制表路径的正则表达式匹配。当设置为true时,table_path 将被视为正则表达式模式。当设置为false或未指定时,table_path 
将被视为精确路径(不进行正则匹配)。                                                              
                                                              |
 | table_path                                 | String     | 否    | -     | 
表的完整路径,您可以使用此配置代替 `query`。<br/>示例:<br/>mysql: "testdb.table1"<br/>oracle: 
"test_schema.table1"<br/>sqlserver: "testdb.test_schema.table1"<br/>postgresql: 
"testdb.test_schema.table1"                                  |
 | table_list                                 | Array      | 否    | -     | 
要读取的表的列表,您可以使用此配置代替 `table_path`,示例如下: ```[{ table_path = "testdb.table1"}, 
{table_path = "testdb.table2", query = "select * id, name from 
testdb.table2"}]```                                                         |
 | where_condition                            | String     | 否    | -     | 
所有表/查询的通用行过滤条件,必须以 `where` 开头。例如 `where id > 100`。                              
                                                                                
                                                       |
diff --git a/docs/zh/connector-v2/source/PostgreSQL.md 
b/docs/zh/connector-v2/source/PostgreSQL.md
index 591166706b..3e777a80f0 100644
--- a/docs/zh/connector-v2/source/PostgreSQL.md
+++ b/docs/zh/connector-v2/source/PostgreSQL.md
@@ -90,6 +90,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 | partition_num                                | Int         | 否   | 作业并行性     
 | 分区数量,仅支持正整数。默认值为作业并行性                                                        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | fetch_size                                   | Int         | 否   | 0         
      | 对于返回大量对象的查询,您可以配置<br/> 用于查询的行抓取大小,以通过减少所需的数据库访问次数来提高性能。<br/> 0 表示使用 
JDBC 默认值。                                                                       
                                                                                
                                                                                
                                                                                
                  [...]
 | properties                                   | Map        | 否   | -          
     | 其他连接配置参数,当属性和 URL 具有相同参数时,<br/> 优先级由驱动程序的具体实现决定。在 MySQL 中,属性优先于 URL。     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| use_regex                                    | Boolean    | 否  | false       
    | 控制表路径的正则表达式匹配。当设置为true时,table_path 将被视为正则表达式模式。当设置为false或未指定时,table_path 
将被视为精确路径(不进行正则匹配)。                                                              
                                                           |
 | table_path                                   | String        | 否   | -       
        | 表的完整路径,您可以使用此配置替代 `query`。<br/> 示例:<br/> mysql: "testdb.table1" <br/> 
oracle: "test_schema.table1" <br/> sqlserver: "testdb.test_schema.table1" <br/> 
postgresql: "testdb.test_schema.table1"                                         
                                                                                
                                                                                
              [...]
 | table_list                                   | Array         | 否   | -       
        | 要读取的表列表,您可以使用此配置替代 `table_path` 示例:```[{ table_path = 
"testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name 
from testdb.table2"}]```                                                        
                                                                                
                                                                                
                                  [...]
 | where_condition                              | String        | 否   | -       
        | 所有表/查询的通用行过滤条件,必须以 `where` 开头。 例如 `where id > 100`                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 629870c712..4423ce223d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Interface for reading and writing table metadata from SeaTunnel. Each 
connector need to contain
@@ -164,21 +165,44 @@ public interface Catalog extends AutoCloseable {
         Pattern databasePattern =
                 
Pattern.compile(config.get(ConnectorCommonOptions.DATABASE_PATTERN));
         Pattern tablePattern = 
Pattern.compile(config.get(ConnectorCommonOptions.TABLE_PATTERN));
+
         List<String> allDatabase = this.listDatabases();
         allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
         List<TablePath> tablePaths = new ArrayList<>();
+
         for (String databaseName : allDatabase) {
-            tableNames = this.listTables(databaseName);
-            tableNames.forEach(
-                    tableName -> {
-                        if (tablePattern.matcher(databaseName + "." + 
tableName).matches()) {
-                            tablePaths.add(TablePath.of(databaseName, 
tableName));
-                        }
-                    });
+            List<TablePath> paths = this.listTablePaths(databaseName);
+            tablePaths.addAll(
+                    paths.stream()
+                            .filter(
+                                    path ->
+                                            tablePattern
+                                                    .matcher(
+                                                            
path.getDatabaseName()
+                                                                    + "."
+                                                                    + 
path.getSchemaAndTableName())
+                                                    .matches())
+                            .collect(Collectors.toList()));
         }
         return buildCatalogTablesWithErrorCheck(tablePaths.iterator());
     }
 
+    default List<TablePath> listTablePaths(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        List<String> tableNames = listTables(databaseName);
+        return tableNames.stream()
+                .map(
+                        tableName -> {
+                            String[] parts = tableName.split("\\.");
+                            if (parts.length > 1) {
+                                return TablePath.of(databaseName, parts[0], 
parts[1]);
+                            } else {
+                                return TablePath.of(databaseName, null, 
tableName);
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
     default List<CatalogTable> 
buildCatalogTablesWithErrorCheck(Iterator<TablePath> tablePaths) {
         Map<String, Map<String, String>> unsupportedTable = new 
LinkedHashMap<>();
         List<CatalogTable> catalogTables = new ArrayList<>();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
index 6647d9c8eb..846f4b3c43 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
@@ -105,4 +105,10 @@ public interface JdbcSourceOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Skip the analysis of table count");
+
+    Option<Boolean> USE_REGEX =
+            Options.key("use_regex")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Use regular expression for table path 
matching");
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
index 17c8d1790c..622808d510 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
@@ -63,6 +63,9 @@ public class JdbcSourceTableConfig implements Serializable {
     @JsonProperty("skip_analyze")
     private Boolean skipAnalyze;
 
+    @JsonProperty("use_regex")
+    private Boolean useRegex;
+
     @Tolerate
     public JdbcSourceTableConfig() {}
 
@@ -84,6 +87,7 @@ public class JdbcSourceTableConfig implements Serializable {
                             
.partitionNumber(connectorConfig.get(JdbcOptions.PARTITION_NUM))
                             
.partitionStart(connectorConfig.get(JdbcOptions.PARTITION_LOWER_BOUND))
                             
.partitionEnd(connectorConfig.get(JdbcOptions.PARTITION_UPPER_BOUND))
+                            
.useRegex(connectorConfig.get(JdbcSourceOptions.USE_REGEX))
                             .build();
             tableList = Collections.singletonList(tableProperty);
         }
@@ -96,6 +100,9 @@ public class JdbcSourceTableConfig implements Serializable {
                     tableConfig.setUseSelectCount(
                             
connectorConfig.get(JdbcSourceOptions.USE_SELECT_COUNT));
                     
tableConfig.setSkipAnalyze(connectorConfig.get(JdbcSourceOptions.SKIP_ANALYZE));
+                    if (tableConfig.getUseRegex() == null) {
+                        
tableConfig.setUseRegex(connectorConfig.get(JdbcSourceOptions.USE_REGEX));
+                    }
                 });
 
         if (tableList.size() > 1) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index b1d636cb97..e81166ffd9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
 import org.apache.seatunnel.shade.com.google.common.base.Strings;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
@@ -63,6 +64,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class JdbcCatalogUtils {
     private static final String DEFAULT_CATALOG_NAME = "jdbc_catalog";
+    private static final String DOT_PLACEHOLDER = "__$DOT$__";
 
     public static Map<TablePath, JdbcSourceTable> getTables(
             JdbcConnectionConfig jdbcConnectionConfig, 
List<JdbcSourceTableConfig> tablesConfig)
@@ -83,24 +85,33 @@ public class JdbcCatalogUtils {
                 Map<String, Map<String, String>> unsupportedTable = new 
LinkedHashMap<>();
                 for (JdbcSourceTableConfig tableConfig : tablesConfig) {
                     try {
-                        CatalogTable catalogTable =
-                                getCatalogTable(tableConfig, jdbcCatalog, 
jdbcDialect);
-                        TablePath tablePath = 
catalogTable.getTableId().toTablePath();
-                        JdbcSourceTable jdbcSourceTable =
-                                JdbcSourceTable.builder()
-                                        .tablePath(tablePath)
-                                        .query(tableConfig.getQuery())
-                                        
.partitionColumn(tableConfig.getPartitionColumn())
-                                        
.partitionNumber(tableConfig.getPartitionNumber())
-                                        
.partitionStart(tableConfig.getPartitionStart())
-                                        
.partitionEnd(tableConfig.getPartitionEnd())
-                                        
.useSelectCount(tableConfig.getUseSelectCount())
-                                        
.skipAnalyze(tableConfig.getSkipAnalyze())
-                                        .catalogTable(catalogTable)
-                                        .build();
-                        tables.put(tablePath, jdbcSourceTable);
-                        if (log.isDebugEnabled()) {
-                            log.debug("Loaded catalog table : {}, {}", 
tablePath, jdbcSourceTable);
+                        if (StringUtils.isNotEmpty(tableConfig.getTablePath())
+                                && StringUtils.isEmpty(tableConfig.getQuery())
+                                && tableConfig.getUseRegex()) {
+                            processRegexTablePath(jdbcCatalog, jdbcDialect, 
tableConfig, tables);
+                        } else {
+                            CatalogTable catalogTable =
+                                    getCatalogTable(tableConfig, jdbcCatalog, 
jdbcDialect);
+                            TablePath tablePath = 
catalogTable.getTableId().toTablePath();
+                            JdbcSourceTable jdbcSourceTable =
+                                    JdbcSourceTable.builder()
+                                            .tablePath(tablePath)
+                                            .query(tableConfig.getQuery())
+                                            
.partitionColumn(tableConfig.getPartitionColumn())
+                                            
.partitionNumber(tableConfig.getPartitionNumber())
+                                            
.partitionStart(tableConfig.getPartitionStart())
+                                            
.partitionEnd(tableConfig.getPartitionEnd())
+                                            
.useSelectCount(tableConfig.getUseSelectCount())
+                                            
.skipAnalyze(tableConfig.getSkipAnalyze())
+                                            .catalogTable(catalogTable)
+                                            .build();
+                            tables.put(tablePath, jdbcSourceTable);
+                            if (log.isDebugEnabled()) {
+                                log.debug(
+                                        "Loaded catalog table : {}, {}",
+                                        tablePath,
+                                        jdbcSourceTable);
+                            }
                         }
                     } catch (SeaTunnelRuntimeException e) {
                         if (e.getSeaTunnelErrorCode()
@@ -409,4 +420,88 @@ public class JdbcCatalogUtils {
         catalogConfig.put(JdbcOptions.HANDLE_BLOB_AS_STRING.key(), 
config.isHandleBlobAsString());
         return ReadonlyConfig.fromMap(catalogConfig);
     }
+
+    private static void processRegexTablePath(
+            AbstractJdbcCatalog jdbcCatalog,
+            JdbcDialect jdbcDialect,
+            JdbcSourceTableConfig tableConfig,
+            Map<TablePath, JdbcSourceTable> result)
+            throws SQLException {
+
+        String tablePath = tableConfig.getTablePath();
+        log.info("Processing table path with regex: {}", tablePath);
+
+        String processedTablePath = tablePath.replace("\\.", DOT_PLACEHOLDER);
+        log.debug("After replacing escaped dots with placeholder: {}", 
processedTablePath);
+
+        TablePath parsedPath = jdbcDialect.parse(processedTablePath);
+
+        String databasePattern = parsedPath.getDatabaseName();
+        String schemaPattern = parsedPath.getSchemaName();
+        String tableNamePattern = parsedPath.getTableName();
+
+        if (StringUtils.isEmpty(databasePattern)) {
+            databasePattern = ".*";
+        }
+
+        String fullTablePattern;
+        if (StringUtils.isNotEmpty(schemaPattern)) {
+            fullTablePattern =
+                    String.format(
+                            "%s.%s.%s",
+                            databasePattern.replace(DOT_PLACEHOLDER, "."),
+                            schemaPattern.replace(DOT_PLACEHOLDER, "."),
+                            tableNamePattern.replace(DOT_PLACEHOLDER, "."));
+        } else {
+            fullTablePattern =
+                    String.format(
+                            "%s.%s",
+                            databasePattern.replace(DOT_PLACEHOLDER, "."),
+                            tableNamePattern.replace(DOT_PLACEHOLDER, "."));
+        }
+
+        log.info(
+                "Parsed patterns - database: {}, full table pattern: {}",
+                databasePattern,
+                fullTablePattern);
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(ConnectorCommonOptions.DATABASE_PATTERN.key(), 
databasePattern);
+        configMap.put(ConnectorCommonOptions.TABLE_PATTERN.key(), 
fullTablePattern);
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+        try {
+            List<CatalogTable> catalogTables = jdbcCatalog.getTables(config);
+
+            if (catalogTables.isEmpty()) {
+                log.warn("No tables found matching regex pattern: {}", 
tablePath);
+                return;
+            }
+
+            for (CatalogTable catalogTable : catalogTables) {
+                TablePath path = catalogTable.getTableId().toTablePath();
+
+                JdbcSourceTable jdbcSourceTable =
+                        JdbcSourceTable.builder()
+                                .tablePath(path)
+                                
.partitionColumn(tableConfig.getPartitionColumn())
+                                
.partitionNumber(tableConfig.getPartitionNumber())
+                                
.partitionStart(tableConfig.getPartitionStart())
+                                .partitionEnd(tableConfig.getPartitionEnd())
+                                
.useSelectCount(tableConfig.getUseSelectCount())
+                                .skipAnalyze(tableConfig.getSkipAnalyze())
+                                .catalogTable(catalogTable)
+                                .build();
+
+                result.put(path, jdbcSourceTable);
+                log.info("Found table matching regex pattern: {}", path);
+            }
+
+            log.info("Found {} tables matching regex pattern: {}", 
catalogTables.size(), tablePath);
+        } catch (Exception e) {
+            log.warn("Error processing table path with regex: {}", tablePath, 
e);
+            throw new SQLException("Failed to process regex table path: " + 
tablePath, e);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
index 872dc26f8f..223a5b345d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
@@ -17,25 +17,44 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
 public class JdbcCatalogUtilsTest {
     private static final CatalogTable DEFAULT_TABLE =
             CatalogTable.of(
@@ -370,4 +389,394 @@ public class JdbcCatalogUtilsTest {
                 tableOfPath.getTableSchema().getColumns().get(0),
                 mergeTable.getTableSchema().getColumns().get(0));
     }
+
+    @Test
+    public void testCatalogGetTablesWithMysqlPattern() throws Exception {
+        TestCatalog testCatalog = spy(new TestCatalog());
+
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .column(PhysicalColumn.of("id", BasicType.INT_TYPE, 0, 
true, null, null))
+                        .build();
+
+        List<String> allDatabases = new ArrayList<>(Arrays.asList("test", 
"prod", "dev"));
+
+        Map<String, List<String>> databaseTables = new HashMap<>();
+        databaseTables.put(
+                "test", Arrays.asList("table1", "table2", "table3", 
"table123", "tableabc"));
+        databaseTables.put("prod", Arrays.asList("prod_table1", "prod_table2", 
"prod_table3"));
+        databaseTables.put("dev", Arrays.asList("dev_table1", "dev_table2"));
+
+        Map<TablePath, CatalogTable> tableMap = new HashMap<>();
+        for (String database : allDatabases) {
+            for (String tableName : databaseTables.get(database)) {
+                TablePath tablePath = TablePath.of(database, null, tableName);
+                CatalogTable table =
+                        CatalogTable.of(
+                                TableIdentifier.of(database, null, null, 
tableName),
+                                tableSchema,
+                                Collections.emptyMap(),
+                                Collections.emptyList(),
+                                "Test " + tableName);
+                tableMap.put(tablePath, table);
+            }
+        }
+
+        doAnswer(invocation -> new 
ArrayList<>(allDatabases)).when(testCatalog).listDatabases();
+
+        for (String database : allDatabases) {
+            doReturn(true).when(testCatalog).databaseExists(eq(database));
+        }
+
+        for (String database : allDatabases) {
+            doReturn(new ArrayList<>(databaseTables.get(database)))
+                    .when(testCatalog)
+                    .listTables(eq(database));
+        }
+
+        for (String database : allDatabases) {
+            List<TablePath> paths =
+                    databaseTables.get(database).stream()
+                            .map(tableName -> TablePath.of(database, null, 
tableName))
+                            .collect(Collectors.toList());
+            doReturn(paths).when(testCatalog).listTablePaths(eq(database));
+        }
+
+        doReturn(true).when(testCatalog).tableExists(any(TablePath.class));
+
+        doAnswer(
+                        invocation -> {
+                            TablePath path = invocation.getArgument(0);
+                            CatalogTable table = tableMap.get(path);
+                            if (table == null) {
+                                throw new TableNotExistException("test", path);
+                            }
+                            return table;
+                        })
+                .when(testCatalog)
+                .getTable(any(TablePath.class));
+
+        testMysqlRegexPattern(
+                testCatalog,
+                "test",
+                "test.table\\d+",
+                Arrays.asList("table1", "table2", "table3", "table123"));
+
+        testMysqlRegexPattern(
+                testCatalog,
+                ".*",
+                ".*table1",
+                Arrays.asList("table1", "prod_table1", "dev_table1"));
+
+        testMysqlRegexPattern(
+                testCatalog,
+                "prod",
+                "prod.prod_table[1-2]",
+                Arrays.asList("prod_table1", "prod_table2"));
+
+        testMysqlRegexPattern(testCatalog, ".*", "nonexistent.*", 
Collections.emptyList());
+    }
+
+    private void testMysqlRegexPattern(
+            Catalog catalog,
+            String databasePattern,
+            String tablePattern,
+            List<String> expectedTablePaths) {
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(ConnectorCommonOptions.DATABASE_PATTERN.key(), 
databasePattern);
+        configMap.put(ConnectorCommonOptions.TABLE_PATTERN.key(), 
tablePattern);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        List<CatalogTable> tables = catalog.getTables(config);
+
+        List<String> actualTablePaths =
+                tables.stream()
+                        .map(t -> t.getTableId().toTablePath().toString())
+                        .collect(Collectors.toList());
+
+        Set<String> actualTablePathSet = new HashSet<>(actualTablePaths);
+        Set<String> expectedTablePathSet = new HashSet<>(expectedTablePaths);
+
+        Assertions.assertEquals(
+                expectedTablePathSet.size(),
+                actualTablePathSet.size(),
+                "Expected "
+                        + expectedTablePathSet.size()
+                        + " tables for pattern: "
+                        + databasePattern
+                        + "."
+                        + tablePattern);
+
+        if (!expectedTablePaths.isEmpty()) {
+            for (String expectedTablePath : expectedTablePaths) {
+                Assertions.assertTrue(
+                        actualTablePathSet.contains(expectedTablePath),
+                        "Expected table path "
+                                + expectedTablePath
+                                + " not found for pattern: "
+                                + databasePattern
+                                + "."
+                                + tablePattern);
+            }
+        } else {
+            Assertions.assertTrue(
+                    actualTablePathSet.isEmpty(),
+                    "Expected empty result for pattern: " + databasePattern + 
"." + tablePattern);
+        }
+    }
+
+    @Test
+    public void testCatalogGetTablesWithPostgresPattern() throws Exception {
+        String catalogName = "postgres_catalog";
+        TestCatalog postgresCatalog = spy(new TestCatalog());
+
+        doReturn(catalogName).when(postgresCatalog).name();
+
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .column(PhysicalColumn.of("id", BasicType.INT_TYPE, 0, 
true, null, null))
+                        .build();
+
+        List<String> allDatabases = new ArrayList<>(Arrays.asList("postgres", 
"test_db", "dev_db"));
+
+        Map<String, List<String>> databaseSchemas = new HashMap<>();
+        databaseSchemas.put("postgres", Arrays.asList("public", "schema1", 
"schema2"));
+        databaseSchemas.put("test_db", Arrays.asList("public", "test_schema"));
+        databaseSchemas.put("dev_db", Arrays.asList("public", "dev_schema"));
+
+        Map<String, Map<String, List<String>>> schemasTables = new HashMap<>();
+
+        Map<String, List<String>> postgresSchemas = new HashMap<>();
+        postgresSchemas.put("public", Arrays.asList("users", "orders", 
"products", "customers"));
+        postgresSchemas.put("schema1", Arrays.asList("table1", "table2", 
"table3"));
+        postgresSchemas.put("schema2", Arrays.asList("log_2021", "log_2022", 
"log_2023"));
+        schemasTables.put("postgres", postgresSchemas);
+
+        Map<String, List<String>> testDbSchemas = new HashMap<>();
+        testDbSchemas.put("public", Arrays.asList("test_table1", 
"test_table2"));
+        testDbSchemas.put("test_schema", Arrays.asList("data_table1", 
"data_table2"));
+        schemasTables.put("test_db", testDbSchemas);
+
+        Map<String, List<String>> devDbSchemas = new HashMap<>();
+        devDbSchemas.put("public", Arrays.asList("dev_table1", "dev_table2"));
+        devDbSchemas.put("dev_schema", Arrays.asList("temp_table1", 
"temp_table2"));
+        schemasTables.put("dev_db", devDbSchemas);
+
+        Map<TablePath, CatalogTable> tableMap = new HashMap<>();
+        for (String database : allDatabases) {
+            for (String schema : databaseSchemas.get(database)) {
+                for (String tableName : 
schemasTables.get(database).get(schema)) {
+                    TablePath tablePath = TablePath.of(database, schema, 
tableName);
+                    CatalogTable table =
+                            CatalogTable.of(
+                                    TableIdentifier.of(catalogName, database, 
schema, tableName),
+                                    tableSchema,
+                                    Collections.emptyMap(),
+                                    Collections.emptyList(),
+                                    "Test " + tableName);
+                    tableMap.put(tablePath, table);
+                }
+            }
+        }
+
+        doAnswer(invocation -> new 
ArrayList<>(allDatabases)).when(postgresCatalog).listDatabases();
+
+        for (String database : allDatabases) {
+            doReturn(true).when(postgresCatalog).databaseExists(eq(database));
+        }
+
+        for (String database : allDatabases) {
+            for (String schema : databaseSchemas.get(database)) {
+                List<String> tables = schemasTables.get(database).get(schema);
+                doReturn(new ArrayList<>(tables))
+                        .when(postgresCatalog)
+                        .listTables(eq(database + "." + schema));
+            }
+        }
+
+        for (String database : allDatabases) {
+            List<TablePath> paths = new ArrayList<>();
+            for (String schema : databaseSchemas.get(database)) {
+                for (String tableName : 
schemasTables.get(database).get(schema)) {
+                    paths.add(TablePath.of(database, schema, tableName));
+                }
+            }
+            doReturn(paths).when(postgresCatalog).listTablePaths(eq(database));
+        }
+
+        doReturn(true).when(postgresCatalog).tableExists(any(TablePath.class));
+
+        doAnswer(
+                        invocation -> {
+                            TablePath path = invocation.getArgument(0);
+                            CatalogTable table = tableMap.get(path);
+                            if (table == null) {
+                                throw new TableNotExistException("test", path);
+                            }
+                            return table;
+                        })
+                .when(postgresCatalog)
+                .getTable(any(TablePath.class));
+
+        testPostgresRegexPattern(
+                postgresCatalog,
+                "postgres",
+                "postgres\\.public\\..*",
+                Arrays.asList(
+                        "postgres.public.users",
+                        "postgres.public.orders",
+                        "postgres.public.products",
+                        "postgres.public.customers"));
+
+        testPostgresRegexPattern(
+                postgresCatalog,
+                ".*",
+                ".*\\.public\\..*table.*",
+                Arrays.asList(
+                        "test_db.public.test_table1",
+                        "test_db.public.test_table2",
+                        "dev_db.public.dev_table1",
+                        "dev_db.public.dev_table2"));
+
+        testPostgresRegexPattern(
+                postgresCatalog,
+                ".*",
+                ".*\\..*\\.log_\\d{4}",
+                Arrays.asList(
+                        "postgres.schema2.log_2021",
+                        "postgres.schema2.log_2022",
+                        "postgres.schema2.log_2023"));
+
+        testPostgresRegexPattern(
+                postgresCatalog,
+                "test_db",
+                "test_db\\..*\\..*",
+                Arrays.asList(
+                        "test_db.public.test_table1",
+                        "test_db.public.test_table2",
+                        "test_db.test_schema.data_table1",
+                        "test_db.test_schema.data_table2"));
+
+        testPostgresRegexPattern(
+                postgresCatalog, ".*", ".*\\..*\\.nonexistent.*", 
Collections.emptyList());
+    }
+
+    private void testPostgresRegexPattern(
+            Catalog catalog,
+            String databasePattern,
+            String tablePattern,
+            List<String> expectedTablePaths) {
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(ConnectorCommonOptions.DATABASE_PATTERN.key(), 
databasePattern);
+        configMap.put(ConnectorCommonOptions.TABLE_PATTERN.key(), 
tablePattern);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+        List<CatalogTable> tables = catalog.getTables(config);
+
+        List<String> actualTablePaths =
+                tables.stream()
+                        .map(
+                                t -> {
+                                    TableIdentifier id = t.getTableId();
+                                    return id.getDatabaseName()
+                                            + "."
+                                            + id.getSchemaName()
+                                            + "."
+                                            + id.getTableName();
+                                })
+                        .collect(Collectors.toList());
+
+        Set<String> actualTablePathSet = new HashSet<>(actualTablePaths);
+        Set<String> expectedTablePathSet = new HashSet<>(expectedTablePaths);
+
+        Assertions.assertEquals(
+                expectedTablePathSet.size(),
+                actualTablePathSet.size(),
+                "Expected "
+                        + expectedTablePathSet.size()
+                        + " tables for pattern: "
+                        + databasePattern
+                        + "."
+                        + tablePattern);
+
+        if (!expectedTablePaths.isEmpty()) {
+            for (String expectedTablePath : expectedTablePaths) {
+                Assertions.assertTrue(
+                        actualTablePathSet.contains(expectedTablePath),
+                        "Expected table path "
+                                + expectedTablePath
+                                + " not found for pattern: "
+                                + databasePattern
+                                + "."
+                                + tablePattern);
+            }
+        } else {
+            Assertions.assertTrue(
+                    actualTablePathSet.isEmpty(),
+                    "Expected empty result for pattern: " + databasePattern + 
"." + tablePattern);
+        }
+    }
+
+    private static class TestCatalog implements Catalog {
+
+        @Override
+        public void open() throws CatalogException {}
+
+        @Override
+        public void close() throws CatalogException {}
+
+        @Override
+        public String name() {
+            return "TestCatalog";
+        }
+
+        @Override
+        public String getDefaultDatabase() throws CatalogException {
+            return "test";
+        }
+
+        @Override
+        public boolean databaseExists(String databaseName) throws 
CatalogException {
+            return false;
+        }
+
+        @Override
+        public List<String> listDatabases() throws CatalogException {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<String> listTables(String databaseName)
+                throws CatalogException, DatabaseNotExistException {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public boolean tableExists(TablePath tablePath) throws 
CatalogException {
+            return false;
+        }
+
+        @Override
+        public CatalogTable getTable(TablePath tablePath)
+                throws CatalogException, TableNotExistException {
+            throw new TableNotExistException("test", tablePath);
+        }
+
+        @Override
+        public void createTable(TablePath tablePath, CatalogTable table, 
boolean ignoreIfExists)
+                throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {}
+
+        @Override
+        public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {}
+
+        @Override
+        public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+                throws DatabaseAlreadyExistException, CatalogException {}
+
+        @Override
+        public void dropDatabase(TablePath tablePath, boolean 
ignoreIfNotExists)
+                throws DatabaseNotExistException, CatalogException {}
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
index c3349c6ccd..72c37c8b8e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlMultipleTablesIT.java
@@ -204,6 +204,36 @@ public class JdbcMysqlMultipleTablesIT extends 
TestSuiteBase implements TestReso
                 0, sqlConfEexecResult.getExitCode(), 
sqlConfEexecResult.getStderr());
     }
 
+    @TestTemplate
+    public void testMysqlJdbcRegexPatternE2e(TestContainer container)
+            throws IOException, InterruptedException, SQLException {
+        clearSinkTables();
+
+        Container.ExecResult execResult =
+                
container.executeJob("/jdbc_mysql_source_and_sink_with_pattern_tables.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        List<Executable> asserts =
+                TABLES.stream()
+                        .map(
+                                (Function<String, Executable>)
+                                        table ->
+                                                () ->
+                                                        
Assertions.assertIterableEquals(
+                                                                query(
+                                                                        
String.format(
+                                                                               
 "SELECT * FROM %s.%s",
+                                                                               
 SOURCE_DATABASE,
+                                                                               
 table)),
+                                                                query(
+                                                                        
String.format(
+                                                                               
 "SELECT * FROM %s.%s",
+                                                                               
 SINK_DATABASE,
+                                                                               
 table))))
+                        .collect(Collectors.toList());
+        Assertions.assertAll(asserts);
+    }
+
     @AfterAll
     @Override
     public void tearDown() throws Exception {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
index 8ba6b1e851..35433f8c57 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleMultipleTablesIT.java
@@ -177,6 +177,35 @@ public class JdbcOracleMultipleTablesIT extends 
TestSuiteBase implements TestRes
         Assertions.assertAll(asserts);
     }
 
+    @TestTemplate
+    public void testOracleJdbcRegexPatternE2e(TestContainer container)
+            throws IOException, InterruptedException, SQLException {
+        clearSinkTables();
+
+        Container.ExecResult execResult =
+                
container.executeJob("/jdbc_oracle_source_with_pattern_tables_to_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        List<Executable> asserts =
+                TABLES.stream()
+                        .map(
+                                (Function<String, Executable>)
+                                        table ->
+                                                () ->
+                                                        
Assertions.assertIterableEquals(
+                                                                query(
+                                                                        
String.format(
+                                                                               
 "SELECT * FROM %s.%s order by INTEGER_COL asc",
+                                                                               
 SCHEMA, table)),
+                                                                query(
+                                                                        
String.format(
+                                                                               
 "SELECT * FROM %s.%s order by INTEGER_COL asc",
+                                                                               
 SCHEMA,
+                                                                               
 "SINK_" + table))))
+                        .collect(Collectors.toList());
+        Assertions.assertAll(asserts);
+    }
+
     @AfterAll
     @Override
     public void tearDown() throws Exception {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_pattern_tables.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_pattern_tables.conf
new file mode 100644
index 0000000000..cf227e33a6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_pattern_tables.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
+    driver = "com.mysql.cj.jdbc.Driver"
+    connection_check_timeout_sec = 100
+    user = "root"
+    password = "Abc!@#135_seatunnel"
+
+    table_list = [
+      {
+        table_path = "source.table\\d+"
+        use_regex = true
+      }
+    ]
+    where_condition = "where c_int >= 0"
+    split.size = 8096
+    split.even-distribution.factor.upper-bound = 100
+    split.even-distribution.factor.lower-bound = 0.05
+    split.sample-sharding.threshold = 1000
+    split.inverse-sampling.rate = 1000
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "root"
+    password = "Abc!@#135_seatunnel"
+
+    database = "sink"
+    table = "${table_name}"
+    generate_sink_sql = true
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_with_pattern_tables_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_with_pattern_tables_to_sink.conf
new file mode 100644
index 0000000000..08d42b093e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_with_pattern_tables_to_sink.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+    user = testUser
+    password = testPassword
+    use_select_count = true
+    table_list = [
+        {
+          table_path = "TESTUSER.TABLE\\d+"
+          use_regex = true
+        }
+    ]
+    properties {
+       database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+sink {
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+    user = testUser
+    password = testPassword
+    database = XE
+    table = "TESTUSER.SINK_${table_name}"
+    generate_sink_sql = true
+    properties {
+       database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+  }
+
+}

Reply via email to