This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 5c3f0c9b0 [improve][cdc] support sharding-tables (#4207)
5c3f0c9b0 is described below
commit 5c3f0c9b002bcece57f1f8f654322e0005c6ca29
Author: Zongwen Li <[email protected]>
AuthorDate: Fri Feb 24 15:33:46 2023 +0800
[improve][cdc] support sharding-tables (#4207)
---
docs/en/connector-v2/source/MySQL-CDC.md | 11 +++++------
.../cdc/base/config/JdbcSourceConfigFactory.java | 8 +++-----
.../connectors/cdc/base/option/JdbcSourceOptions.java | 14 +++++---------
.../seatunnel/cdc/mysql/source/MySqlIncrementalSource.java | 3 ++-
.../cdc/mysql/source/MySqlIncrementalSourceFactory.java | 5 +++--
.../source/source/SqlServerIncrementalSourceFactory.java | 5 +++--
6 files changed, 21 insertions(+), 25 deletions(-)
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md
b/docs/en/connector-v2/source/MySQL-CDC.md
index 11786c68c..3bccb2aec 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -24,8 +24,8 @@ describes how to setup the MySQL CDC connector to run SQL
queries against MySQL
| port | Integer | No | 3306
|
| username | String | Yes | -
|
| password | String | Yes | -
|
-| database-name | String | Yes | -
|
-| table-name | String | Yes | -
|
+| database-names | List | Yes | -
|
+| table-names | List | Yes | -
|
| base-url | String | Yes | -
|
| startup.mode | Enum | No |
INITIAL |
| startup.timestamp | Long | No | -
|
@@ -64,11 +64,11 @@ Name of the database to use when connecting to the database
server.
Password to use when connecting to the database server.
-### database-name [String]
+### database-names [List]
Database name of the database to monitor.
-### table-name [String]
+### table-names [List]
Table name of the database to monitor.
@@ -196,8 +196,7 @@ source {
hostname = "127.0.0.1"
username = "mysqluser"
password = "mysqlpw"
- database-name = "inventory_vwyw0n"
- table-name = "products"
+ table-names = ["inventory_vwyw0n.products"]
base-url = "jdbc:mysql://localhost:56725"
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 5ac33abdb..dd42f72d6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -18,11 +18,11 @@
package org.apache.seatunnel.connectors.cdc.base.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -189,10 +189,8 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
this.username = config.get(JdbcSourceOptions.USERNAME);
this.password = config.get(JdbcSourceOptions.PASSWORD);
- // TODO: support multi-table
- this.databaseList =
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
- this.tableList =
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME)
- + "\\." + config.get(JdbcSourceOptions.TABLE_NAME));
+ this.databaseList = config.get(JdbcSourceOptions.DATABASE_NAMES);
+ this.tableList = config.get(CatalogOptions.TABLE_NAMES);
this.distributionFactorUpper =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
this.distributionFactorLower =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 4839ecefe..d11a676e2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
+import java.util.List;
+
/** Configurations for {@link IncrementalSource} of JDBC data source. */
@SuppressWarnings("checkstyle:MagicNumber")
public class JdbcSourceOptions extends SourceOptions {
@@ -50,18 +52,12 @@ public class JdbcSourceOptions extends SourceOptions {
.noDefaultValue()
.withDescription("Password to use when connecting to the
database server.");
- public static final Option<String> DATABASE_NAME =
- Options.key("database-name")
- .stringType()
+ public static final Option<List<String>> DATABASE_NAMES =
+ Options.key("database-names")
+ .listType()
.noDefaultValue()
.withDescription("Database name of the database to
monitor.");
- public static final Option<String> TABLE_NAME =
- Options.key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Table name of the database to monitor.");
-
public static final Option<String> SERVER_TIME_ZONE =
Options.key("server-time-zone")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 66401b4ec..43734efbd 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -76,7 +77,7 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
if (dataType == null) {
// TODO: support metadata keys
MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql",
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(),
jdbcSourceConfig.getPassword(), baseUrl);
- CatalogTable table =
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0),
config.get(JdbcSourceOptions.TABLE_NAME)));
+ CatalogTable table =
mySqlCatalog.getTable(TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
physicalRowType = table.getTableSchema().toPhysicalRowDataType();
} else {
physicalRowType = dataType;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index b6ad57e3d..e22148f81 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -54,10 +55,10 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
JdbcSourceOptions.HOSTNAME,
JdbcSourceOptions.USERNAME,
JdbcSourceOptions.PASSWORD,
- JdbcSourceOptions.DATABASE_NAME,
- JdbcSourceOptions.TABLE_NAME,
+ CatalogOptions.TABLE_NAMES,
JdbcCatalogOptions.BASE_URL)
.optional(
+ JdbcSourceOptions.DATABASE_NAMES,
JdbcSourceOptions.PORT,
JdbcSourceOptions.SERVER_ID,
JdbcSourceOptions.SERVER_TIME_ZONE,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 18a095a61..f69bdc6bd 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
@@ -36,9 +37,9 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
JdbcSourceOptions.HOSTNAME,
JdbcSourceOptions.USERNAME,
JdbcSourceOptions.PASSWORD,
- JdbcSourceOptions.DATABASE_NAME,
- JdbcSourceOptions.TABLE_NAME)
+ CatalogOptions.TABLE_NAMES)
.optional(
+ JdbcSourceOptions.DATABASE_NAMES,
JdbcSourceOptions.PORT,
JdbcSourceOptions.SERVER_TIME_ZONE,
JdbcSourceOptions.CONNECT_TIMEOUT_MS,