This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 5c3f0c9b0 [improve][cdc] support sharding-tables (#4207)
5c3f0c9b0 is described below

commit 5c3f0c9b002bcece57f1f8f654322e0005c6ca29
Author: Zongwen Li <[email protected]>
AuthorDate: Fri Feb 24 15:33:46 2023 +0800

    [improve][cdc] support sharding-tables (#4207)
---
 docs/en/connector-v2/source/MySQL-CDC.md                   | 11 +++++------
 .../cdc/base/config/JdbcSourceConfigFactory.java           |  8 +++-----
 .../connectors/cdc/base/option/JdbcSourceOptions.java      | 14 +++++---------
 .../seatunnel/cdc/mysql/source/MySqlIncrementalSource.java |  3 ++-
 .../cdc/mysql/source/MySqlIncrementalSourceFactory.java    |  5 +++--
 .../source/source/SqlServerIncrementalSourceFactory.java   |  5 +++--
 6 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/docs/en/connector-v2/source/MySQL-CDC.md 
b/docs/en/connector-v2/source/MySQL-CDC.md
index 11786c68c..3bccb2aec 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -24,8 +24,8 @@ describes how to setup the MySQL CDC connector to run SQL 
queries against MySQL
 | port                                           | Integer  | No       | 3306  
        |
 | username                                       | String   | Yes      | -     
        |
 | password                                       | String   | Yes      | -     
        |
-| database-name                                  | String   | Yes      | -     
        |
-| table-name                                     | String   | Yes      | -     
        |
+| database-names                                 | List     | Yes      | -     
        |
+| table-names                                    | List     | Yes      | -     
        |
 | base-url                                       | String   | Yes      | -     
        |
 | startup.mode                                   | Enum     | No       | 
INITIAL       |
 | startup.timestamp                              | Long     | No       | -     
        |
@@ -64,11 +64,11 @@ Name of the database to use when connecting to the database 
server.
 
 Password to use when connecting to the database server.
 
-### database-name [String]
+### database-names [List]
 
 Database name of the database to monitor.
 
-### table-name [String]
+### table-names [List]
 
 Table name of the database to monitor.
 
@@ -196,8 +196,7 @@ source {
     hostname = "127.0.0.1"
     username = "mysqluser"
     password = "mysqlpw"
-    database-name = "inventory_vwyw0n"
-    table-name = "products"
+    table-names = ["inventory_vwyw0n.products"]
     base-url = "jdbc:mysql://localhost:56725"
   }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 5ac33abdb..dd42f72d6 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -18,11 +18,11 @@
 package org.apache.seatunnel.connectors.cdc.base.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -189,10 +189,8 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
         this.username = config.get(JdbcSourceOptions.USERNAME);
         this.password = config.get(JdbcSourceOptions.PASSWORD);
-        // TODO: support multi-table
-        this.databaseList = 
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
-        this.tableList = 
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME)
-            + "\\." + config.get(JdbcSourceOptions.TABLE_NAME));
+        this.databaseList = config.get(JdbcSourceOptions.DATABASE_NAMES);
+        this.tableList = config.get(CatalogOptions.TABLE_NAMES);
         this.distributionFactorUpper = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
         this.distributionFactorLower = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 4839ecefe..d11a676e2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 
+import java.util.List;
+
 /** Configurations for {@link IncrementalSource} of JDBC data source. */
 @SuppressWarnings("checkstyle:MagicNumber")
 public class JdbcSourceOptions extends SourceOptions {
@@ -50,18 +52,12 @@ public class JdbcSourceOptions extends SourceOptions {
                     .noDefaultValue()
                     .withDescription("Password to use when connecting to the 
database server.");
 
-    public static final Option<String> DATABASE_NAME =
-            Options.key("database-name")
-                    .stringType()
+    public static final Option<List<String>> DATABASE_NAMES =
+            Options.key("database-names")
+                    .listType()
                     .noDefaultValue()
                     .withDescription("Database name of the database to 
monitor.");
 
-    public static final Option<String> TABLE_NAME =
-            Options.key("table-name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Table name of the database to monitor.");
-
     public static final Option<String> SERVER_TIME_ZONE =
             Options.key("server-time-zone")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 66401b4ec..43734efbd 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -76,7 +77,7 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
         if (dataType == null) {
             // TODO: support metadata keys
             MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", 
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), 
jdbcSourceConfig.getPassword(), baseUrl);
-            CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), 
config.get(JdbcSourceOptions.TABLE_NAME)));
+            CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
             physicalRowType = table.getTableSchema().toPhysicalRowDataType();
         } else {
             physicalRowType = dataType;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index b6ad57e3d..e22148f81 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -54,10 +55,10 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
                 JdbcSourceOptions.HOSTNAME,
                 JdbcSourceOptions.USERNAME,
                 JdbcSourceOptions.PASSWORD,
-                JdbcSourceOptions.DATABASE_NAME,
-                JdbcSourceOptions.TABLE_NAME,
+                CatalogOptions.TABLE_NAMES,
                 JdbcCatalogOptions.BASE_URL)
             .optional(
+                JdbcSourceOptions.DATABASE_NAMES,
                 JdbcSourceOptions.PORT,
                 JdbcSourceOptions.SERVER_ID,
                 JdbcSourceOptions.SERVER_TIME_ZONE,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 18a095a61..f69bdc6bd 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 
@@ -36,9 +37,9 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
                 JdbcSourceOptions.HOSTNAME,
                 JdbcSourceOptions.USERNAME,
                 JdbcSourceOptions.PASSWORD,
-                JdbcSourceOptions.DATABASE_NAME,
-                JdbcSourceOptions.TABLE_NAME)
+                CatalogOptions.TABLE_NAMES)
             .optional(
+                JdbcSourceOptions.DATABASE_NAMES,
                 JdbcSourceOptions.PORT,
                 JdbcSourceOptions.SERVER_TIME_ZONE,
                 JdbcSourceOptions.CONNECT_TIMEOUT_MS,

Reply via email to