This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b6c6dc0438 [Feature][Connector-V2]Jdbc chunk split add
snapshotSplitColumn config #7794 (#7840)
b6c6dc0438 is described below
commit b6c6dc0438326edf7a329b12702e58fa520a9c3d
Author: GumKey <[email protected]>
AuthorDate: Tue Nov 19 20:27:20 2024 +0800
[Feature][Connector-V2]Jdbc chunk split add snapshotSplitColumn config
#7794 (#7840)
Co-authored-by: XenosK <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
---
docs/en/connector-v2/source/MySQL-CDC.md | 4 +-
docs/en/connector-v2/source/Oracle-CDC.md | 4 +-
docs/en/connector-v2/source/PostgreSQL-CDC.md | 2 +-
docs/en/connector-v2/source/SqlServer-CDC.md | 4 +-
.../cdc/base/config/BaseSourceConfig.java | 4 ++
.../cdc/base/config/JdbcSourceConfig.java | 3 ++
.../cdc/base/config/JdbcSourceConfigFactory.java | 19 ++++++++
.../cdc/base/config/JdbcSourceTableConfig.java | 1 +
.../cdc/base/option/JdbcSourceOptions.java | 3 +-
.../connectors/cdc/base/option/SourceOptions.java | 1 -
.../splitter/AbstractJdbcSourceChunkSplitter.java | 51 ++++++++++++++++++++--
.../cdc/mysql/config/MySqlSourceConfig.java | 3 ++
.../cdc/mysql/config/MySqlSourceConfigFactory.java | 1 +
.../cdc/oracle/config/OracleSourceConfig.java | 3 ++
.../oracle/config/OracleSourceConfigFactory.java | 1 +
.../cdc/postgres/config/PostgresSourceConfig.java | 3 ++
.../config/PostgresSourceConfigFactory.java | 1 +
.../sqlserver/config/SqlServerSourceConfig.java | 3 ++
.../config/SqlServerSourceConfigFactory.java | 1 +
.../src/test/resources/ddl/mysql_cdc.sql | 6 ++-
...c_to_mysql_with_multi_table_mode_one_table.conf | 7 +++
...c_to_mysql_with_multi_table_mode_two_table.conf | 13 +++++-
22 files changed, 123 insertions(+), 15 deletions(-)
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md
b/docs/en/connector-v2/source/MySQL-CDC.md
index fc2ea4d8ff..cc58ec4459 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -169,14 +169,14 @@ When an initial consistent snapshot is made for large
databases, your establishe
## Source Options
-| Name | Type | Required |
Default | Description
[...]
+| Name | Type | Required |
Default | Description
[...]
|------------------------------------------------|----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| base-url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
`jdbc:mysql://localhost:3306:3306/test`.
[...]
| username | String | Yes | -
| Name of the database to use when connecting to the database server.
[...]
| password | String | Yes | -
| Password to use when connecting to the database server.
[...]
| database-names | List | No | -
| Database name of the database to monitor.
[...]
| table-names | List | Yes | -
| Table name of the database to monitor. The table name needs to include the
database name, for example: `database_name.table_name`
[...]
-| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys":["key1"]}]
[...]
+| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]
[...]
| startup.mode | Enum | No |
INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize
historical data at startup, and then synchronize incremental data.<br/>
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup
from the latest offset.<br/> `specific`: Startup from user-supplied specific
offsets. [...]
| startup.specific-offset.file | String | No | -
| Start from the specified binlog file name. **Note, This option is required
when the `startup.mode` option used `specific`.**
[...]
| startup.specific-offset.pos | Long | No | -
| Start from the specified binlog file position. **Note, This option is
required when the `startup.mode` option used `specific`.**
[...]
diff --git a/docs/en/connector-v2/source/Oracle-CDC.md
b/docs/en/connector-v2/source/Oracle-CDC.md
index feef58a0d2..5cb9e33756 100644
--- a/docs/en/connector-v2/source/Oracle-CDC.md
+++ b/docs/en/connector-v2/source/Oracle-CDC.md
@@ -220,7 +220,7 @@ exit;
## Source Options
-| Name | Type | Required |
Default |
Description
[...]
+| Name | Type | Required |
Default | Description
[...]
|------------------------------------------------|----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| base-url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
`idbc:oracle:thin:datasource01:1523:xe`.
[...]
| username | String | Yes | -
| Name of the database to use when connecting to the database server.
[...]
@@ -228,7 +228,7 @@ exit;
| database-names | List | No | -
| Database name of the database to monitor.
[...]
| schema-names | List | No | -
| Schema name of the database to monitor.
[...]
| table-names | List | Yes | -
| Table name of the database to monitor. The table name needs to include the
database name, for example: `database_name.table_name`
[...]
-| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys":["key1"]}]
[...]
+| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]
[...]
| startup.mode | Enum | No |
INITIAL | Optional startup mode for Oracle CDC consumer, valid enumerations are
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize
historical data at startup, and then synchronize incremental data.<br/>
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup
from the latest offset.<br/> `specific`: Startup from user-supplied specific
offsets. [...]
| startup.specific-offset.file | String | No | -
| Start from the specified binlog file name. **Note, This option is required
when the `startup.mode` option used `specific`.**
[...]
| startup.specific-offset.pos | Long | No | -
| Start from the specified binlog file position. **Note, This option is
required when the `startup.mode` option used `specific`.**
[...]
diff --git a/docs/en/connector-v2/source/PostgreSQL-CDC.md
b/docs/en/connector-v2/source/PostgreSQL-CDC.md
index 8197a72b99..d64db14ac5 100644
--- a/docs/en/connector-v2/source/PostgreSQL-CDC.md
+++ b/docs/en/connector-v2/source/PostgreSQL-CDC.md
@@ -93,7 +93,7 @@ ALTER TABLE your_table_name REPLICA IDENTITY FULL;
| password | String | Yes | -
| Password to use when connecting to the database server.
[...]
| database-names | List | No | -
| Database name of the database to monitor.
[...]
| table-names | List | Yes | -
| Table name of the database to monitor. The table name needs to include the
database name, for example: `database_name.table_name`
[...]
-| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys":["key1"]}]
[...]
+| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]
[...]
| startup.mode | Enum | No |
INITIAL | Optional startup mode for PostgreSQL CDC consumer, valid
enumerations are `initial`, `earliest` and `latest`. <br/> `initial`:
Synchronize historical data at startup, and then synchronize incremental
data.<br/> `earliest`: Startup from the earliest offset possible.<br/>
`latest`: Startup from the latest offset.
[...]
| snapshot.split.size | Integer | No | 8096
| The split size (number of rows) of table snapshot, captured tables are
split into multiple splits when read the snapshot of table.
[...]
| snapshot.fetch.size | Integer | No | 1024
| The maximum fetch size for per poll when read table snapshot.
[...]
diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md
b/docs/en/connector-v2/source/SqlServer-CDC.md
index a64b3abfa8..11d686b927 100644
--- a/docs/en/connector-v2/source/SqlServer-CDC.md
+++ b/docs/en/connector-v2/source/SqlServer-CDC.md
@@ -63,13 +63,13 @@ describes how to setup the Sql Server CDC connector to run
SQL queries against S
## Source Options
-| Name | Type | Required |
Default |
Description
[...]
+| Name | Type | Required |
Default | Description
[...]
|------------------------------------------------|----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| username | String | Yes | -
| Name of the database to use when connecting to the database server.
[...]
| password | String | Yes | -
| Password to use when connecting to the database server.
[...]
| database-names | List | Yes | -
| Database name of the database to monitor.
[...]
| table-names | List | Yes | -
| Table name is a combination of schema name and table name
(databaseName.schemaName.tableName).
[...]
-| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys":["key1"]}]
[...]
+| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]
[...]
| base-url | String | Yes | -
| URL has to be with database, like
"jdbc:sqlserver://localhost:1433;databaseName=test".
[...]
| startup.mode | Enum | No |
INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations
are "initial", "earliest", "latest" and "specific".
[...]
| startup.timestamp | Long | No | -
| Start from the specified epoch timestamp (in milliseconds).<br/> **Note,
This option is required when** the **"startup.mode" option used
`'timestamp'`.**
[...]
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
index f4a82d2de1..2b441483a2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import io.debezium.config.Configuration;
import lombok.Getter;
+import java.util.Map;
import java.util.Properties;
/** A basic Source configuration which is used by {@link IncrementalSource}. */
@@ -34,6 +35,7 @@ public abstract class BaseSourceConfig implements
SourceConfig {
@Getter protected final StopConfig stopConfig;
@Getter protected final int splitSize;
+ @Getter protected final Map<String, String> splitColumn;
@Getter protected final double distributionFactorUpper;
@Getter protected final double distributionFactorLower;
@@ -50,6 +52,7 @@ public abstract class BaseSourceConfig implements
SourceConfig {
StartupConfig startupConfig,
StopConfig stopConfig,
int splitSize,
+ Map<String, String> splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
@@ -59,6 +62,7 @@ public abstract class BaseSourceConfig implements
SourceConfig {
this.startupConfig = startupConfig;
this.stopConfig = stopConfig;
this.splitSize = splitSize;
+ this.splitColumn = splitColumn;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.sampleShardingThreshold = sampleShardingThreshold;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
index 9d46ab3393..ddd47a2b83 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -49,6 +50,7 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
List<String> databaseList,
List<String> tableList,
int splitSize,
+ Map<String, String> splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
@@ -70,6 +72,7 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
startupConfig,
stopConfig,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index d5d920c257..99ddb3bd17 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -25,7 +25,9 @@ import
org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import lombok.Setter;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/** A {@link SourceConfig.Factory} to provide {@link SourceConfig} of JDBC
data source. */
@@ -51,6 +53,7 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD.defaultValue();
protected int inverseSamplingRate =
JdbcSourceOptions.INVERSE_SAMPLING_RATE.defaultValue();
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
+ protected Map<String, String> splitColumn;
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
protected String serverTimeZone =
JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
protected long connectTimeoutMillis =
JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
@@ -65,6 +68,11 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
return this;
}
+ public JdbcSourceConfigFactory splitColumn(Map<String, String>
splitColumn) {
+ this.splitColumn = splitColumn;
+ return this;
+ }
+
/** Integer port number of the database server. */
public JdbcSourceConfigFactory port(int port) {
this.port = port;
@@ -239,6 +247,17 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
this.sampleShardingThreshold =
config.get(JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD);
this.inverseSamplingRate =
config.get(JdbcSourceOptions.INVERSE_SAMPLING_RATE);
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
+ this.splitColumn = new HashMap<>();
+ config.getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG)
+ .ifPresent(
+ jtcs -> {
+ jtcs.forEach(
+ jtc -> {
+ this.splitColumn.put(
+ jtc.getTable(),
jtc.getSnapshotSplitColumn());
+ });
+ });
+
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
this.connectTimeoutMillis =
config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
index 5cafa363e8..2c55270bd4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
@@ -26,4 +26,5 @@ import java.util.List;
public class JdbcSourceTableConfig implements Serializable {
private String table;
private List<String> primaryKeys;
+ private String snapshotSplitColumn;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 6cd7ba0631..909e5c2980 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -152,7 +152,8 @@ public class JdbcSourceOptions extends SourceOptions {
+ "["
+ " {"
+ " \"table\":
\"db1.schema1.table1\","
- + " \"primaryKeys\":
[\"key1\",\"key2\"]"
+ + " \"primaryKeys\":
[\"key1\",\"key2\"],"
+ + " \"snapshotSplitColumn\":
\"key2\""
+ " }"
+ "]");
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 87483d9cff..6c83088ef2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -36,7 +36,6 @@ public class SourceOptions {
.defaultValue(8096)
.withDescription(
"The split size (number of rows) of table
snapshot, captured tables are split into multiple splits when read the snapshot
of table.");
-
public static final Option<Integer> SNAPSHOT_FETCH_SIZE =
Options.key("snapshot.fetch.size")
.intType()
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index 60a208de86..05bf5e5d21 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -25,6 +25,8 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
@@ -36,9 +38,12 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.math.BigDecimal.ROUND_CEILING;
import static
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
@@ -379,12 +384,53 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
protected Column getSplitColumn(
JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
throws SQLException {
- Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
Column splitColumn = null;
+ Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+
+ // first , compare user defined split column is in the primary key or
unique key
+ Map<String, String> splitColumnsConfig = new HashMap<>();
+ try {
+ splitColumnsConfig = sourceConfig.getSplitColumn();
+ } catch (Exception e) {
+ log.error("Config snapshotSplitColumn get exception in {}:{}",
tableId, e);
+ }
+ String tableSc =
+ splitColumnsConfig.getOrDefault(tableId.catalog() + "." +
tableId.table(), null);
+
+ if (StringUtils.isNotEmpty(tableSc)) {
+ // Is tableSc(table split column) the unique key
+ AtomicBoolean isUniqueKey = new AtomicBoolean(false);
+ dialect.getUniqueKeys(jdbc, tableId)
+ .forEach(
+ ck ->
+ ck.getColumnNames()
+ .forEach(
+ ckc -> {
+ if
(tableSc.equals(ckc.getColumnName())) {
+
isUniqueKey.set(true);
+ }
+ }));
+
+ if (isUniqueKey.get()) {
+ Column column = table.columnWithName(tableSc);
+ if (isEvenlySplitColumn(column)) {
+ return column;
+ } else {
+ log.warn(
+ "Config snapshotSplitColumn type in {} is not
TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING",
+ tableId);
+ }
+ } else {
+ log.warn("Config snapshotSplitColumn not unique key for table
{}", tableId);
+ }
+ } else {
+ log.info("Config snapshotSplitColumn not exists for table {}",
tableId);
+ }
+
+ Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
if (primaryKey.isPresent()) {
List<String> pkColumns = primaryKey.get().getColumnNames();
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (String pkColumn : pkColumns) {
Column column = table.columnWithName(pkColumn);
if (isEvenlySplitColumn(column)) {
@@ -400,7 +446,6 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
if (!uniqueKeys.isEmpty()) {
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (ConstraintKey uniqueKey : uniqueKeys) {
List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
uniqueKey.getColumnNames();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
index 19d1124847..86aa894351 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
@@ -25,6 +25,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -41,6 +42,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
List<String> databaseList,
List<String> tableList,
int splitSize,
+ Map<String, String> splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
@@ -64,6 +66,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index ba62d72823..ce0b8c802f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -118,6 +118,7 @@ public class MySqlSourceConfigFactory extends
JdbcSourceConfigFactory {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
index 32bcb41f78..263bb38d88 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
@@ -27,6 +27,7 @@ import io.debezium.relational.RelationalTableFilters;
import lombok.Getter;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -49,6 +50,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
List<String> databaseList,
List<String> tableList,
int splitSize,
+ Map<String, String> splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
@@ -72,6 +74,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index e2aece6892..f8d6e8e6f7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -150,6 +150,7 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
index 92ef734566..4f63a2d4fe 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
@@ -25,6 +25,7 @@ import
io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
public class PostgresSourceConfig extends JdbcSourceConfig {
@@ -36,6 +37,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
List<String> databaseList,
List<String> tableList,
int splitSize,
+ Map<String, String> splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
@@ -59,6 +61,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
index ebe1cd0a15..66354a751c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
@@ -115,6 +115,7 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
index 7d4062134a..47eaa3d5a0 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
@@ -25,6 +25,7 @@ import
io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -41,6 +42,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
List<String> databaseList,
List<String> tableList,
int splitSize,
+ Map<String, String> splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
@@ -64,6 +66,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
index b9224653f3..3b3301da49 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
@@ -83,6 +83,7 @@ public class SqlServerSourceConfigFactory extends
JdbcSourceConfigFactory {
databaseList,
tableList,
splitSize,
+ splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index 1103634162..25d7abca0d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -66,7 +66,8 @@ CREATE TABLE mysql_cdc_e2e_source_table
`f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
`f_json` json DEFAULT NULL,
`f_year` year DEFAULT NULL,
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY uniq_key_f (`id`, `f_int`, `f_bigint`) USING BTREE
) ENGINE = InnoDB
AUTO_INCREMENT = 2
DEFAULT CHARSET = utf8mb4
@@ -116,7 +117,8 @@ CREATE TABLE mysql_cdc_e2e_source_table2
`f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
`f_json` json DEFAULT NULL,
`f_year` year DEFAULT NULL,
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY uniq_key_f (`id`, `f_int`, `f_bigint`) USING BTREE
) ENGINE = InnoDB
AUTO_INCREMENT = 2
DEFAULT CHARSET = utf8mb4
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
index f2b513e5ba..dccb028812 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
@@ -35,6 +35,13 @@ source {
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
snapshot.split.size = 1
+ table-name-config = [
+ {
+ table = "mysql_cdc.mysql_cdc_e2e_source_table"
+ primaryKeys = []
+ snapshotSplitColumn = "f_int"
+ }
+ ]
snapshot.fetch.size = 1
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
index 6c93ceda10..9c8ca1e796 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
@@ -33,8 +33,19 @@ source {
password = "mysqlpw"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table",
"mysql_cdc.mysql_cdc_e2e_source_table2"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
-
snapshot.split.size = 1
+ table-name-config = [
+ {
+ table = "mysql_cdc.mysql_cdc_e2e_source_table"
+ primaryKeys = []
+ snapshotSplitColumn = "f_bigint"
+ },
+ {
+ table = "mysql_cdc.mysql_cdc_e2e_source_table2"
+ primaryKeys = []
+ snapshotSplitColumn = "f_bigint"
+ }
+ ]
snapshot.fetch.size = 1
}
}