This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b6c6dc0438 [Feature][Connector-V2]Jdbc chunk split add  
snapshotSplitColumn config #7794 (#7840)
b6c6dc0438 is described below

commit b6c6dc0438326edf7a329b12702e58fa520a9c3d
Author: GumKey <[email protected]>
AuthorDate: Tue Nov 19 20:27:20 2024 +0800

    [Feature][Connector-V2]Jdbc chunk split add  snapshotSplitColumn config 
#7794 (#7840)
    
    Co-authored-by: XenosK <[email protected]>
    Co-authored-by: Jia Fan <[email protected]>
---
 docs/en/connector-v2/source/MySQL-CDC.md           |  4 +-
 docs/en/connector-v2/source/Oracle-CDC.md          |  4 +-
 docs/en/connector-v2/source/PostgreSQL-CDC.md      |  2 +-
 docs/en/connector-v2/source/SqlServer-CDC.md       |  4 +-
 .../cdc/base/config/BaseSourceConfig.java          |  4 ++
 .../cdc/base/config/JdbcSourceConfig.java          |  3 ++
 .../cdc/base/config/JdbcSourceConfigFactory.java   | 19 ++++++++
 .../cdc/base/config/JdbcSourceTableConfig.java     |  1 +
 .../cdc/base/option/JdbcSourceOptions.java         |  3 +-
 .../connectors/cdc/base/option/SourceOptions.java  |  1 -
 .../splitter/AbstractJdbcSourceChunkSplitter.java  | 51 ++++++++++++++++++++--
 .../cdc/mysql/config/MySqlSourceConfig.java        |  3 ++
 .../cdc/mysql/config/MySqlSourceConfigFactory.java |  1 +
 .../cdc/oracle/config/OracleSourceConfig.java      |  3 ++
 .../oracle/config/OracleSourceConfigFactory.java   |  1 +
 .../cdc/postgres/config/PostgresSourceConfig.java  |  3 ++
 .../config/PostgresSourceConfigFactory.java        |  1 +
 .../sqlserver/config/SqlServerSourceConfig.java    |  3 ++
 .../config/SqlServerSourceConfigFactory.java       |  1 +
 .../src/test/resources/ddl/mysql_cdc.sql           |  6 ++-
 ...c_to_mysql_with_multi_table_mode_one_table.conf |  7 +++
 ...c_to_mysql_with_multi_table_mode_two_table.conf | 13 +++++-
 22 files changed, 123 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/source/MySQL-CDC.md 
b/docs/en/connector-v2/source/MySQL-CDC.md
index fc2ea4d8ff..cc58ec4459 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -169,14 +169,14 @@ When an initial consistent snapshot is made for large 
databases, your establishe
 
 ## Source Options
 
-|                      Name                      |   Type   | Required | 
Default | Description                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
+| Name                                           | Type     | Required | 
Default | Description                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
 
|------------------------------------------------|----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | base-url                                       | String   | Yes      | -     
  | The URL of the JDBC connection. Refer to a case: 
`jdbc:mysql://localhost:3306:3306/test`.                                        
                                                                                
                                                                                
                                                                                
                                         [...]
 | username                                       | String   | Yes      | -     
  | Name of the database to use when connecting to the database server.         
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | password                                       | String   | Yes      | -     
  | Password to use when connecting to the database server.                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | database-names                                 | List     | No       | -     
  | Database name of the database to monitor.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | table-names                                    | List     | Yes      | -     
  | Table name of the database to monitor. The table name needs to include the 
database name, for example: `database_name.table_name`                          
                                                                                
                                                                                
                                                                                
               [...]
-| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys":["key1"]}]                                   
                                                                                
                                                                                
                                                                                
                                               [...]
+| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]    
                                                                                
                                                                                
                                                                                
                                               [...]
 | startup.mode                                   | Enum     | No       | 
INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are 
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize 
historical data at startup, and then synchronize incremental data.<br/> 
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup 
from the latest offset.<br/> `specific`: Startup from user-supplied specific 
offsets.                             [...]
 | startup.specific-offset.file                   | String   | No       | -     
  | Start from the specified binlog file name. **Note, This option is required 
when the `startup.mode` option used `specific`.**                               
                                                                                
                                                                                
                                                                                
               [...]
 | startup.specific-offset.pos                    | Long     | No       | -     
  | Start from the specified binlog file position. **Note, This option is 
required when the `startup.mode` option used `specific`.**                      
                                                                                
                                                                                
                                                                                
                    [...]
diff --git a/docs/en/connector-v2/source/Oracle-CDC.md 
b/docs/en/connector-v2/source/Oracle-CDC.md
index feef58a0d2..5cb9e33756 100644
--- a/docs/en/connector-v2/source/Oracle-CDC.md
+++ b/docs/en/connector-v2/source/Oracle-CDC.md
@@ -220,7 +220,7 @@ exit;
 
 ## Source Options
 
-|                      Name                      |   Type   | Required | 
Default |                                                                       
                                                                                
                                                                                
                                                              Description       
                                                                                
                    [...]
+|                      Name                      |   Type   | Required | 
Default | Description                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
 
|------------------------------------------------|----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | base-url                                       | String   | Yes      | -     
  | The URL of the JDBC connection. Refer to a case: 
`idbc:oracle:thin:datasource01:1523:xe`.                                        
                                                                                
                                                                                
                                                                                
                                         [...]
 | username                                       | String   | Yes      | -     
  | Name of the database to use when connecting to the database server.         
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
@@ -228,7 +228,7 @@ exit;
 | database-names                                 | List     | No       | -     
  | Database name of the database to monitor.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | schema-names                                   | List     | No       | -     
  | Schema name of the database to monitor.                                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | table-names                                    | List     | Yes      | -     
  | Table name of the database to monitor. The table name needs to include the 
database name, for example: `database_name.table_name`                          
                                                                                
                                                                                
                                                                                
               [...]
-| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys":["key1"]}]                                   
                                                                                
                                                                                
                                                                                
                                               [...]
+| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]    
                                                                                
                                                                                
                                                                                
                                               [...]
 | startup.mode                                   | Enum     | No       | 
INITIAL | Optional startup mode for Oracle CDC consumer, valid enumerations are 
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize 
historical data at startup, and then synchronize incremental data.<br/> 
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup 
from the latest offset.<br/> `specific`: Startup from user-supplied specific 
offsets.                            [...]
 | startup.specific-offset.file                   | String   | No       | -     
  | Start from the specified binlog file name. **Note, This option is required 
when the `startup.mode` option used `specific`.**                               
                                                                                
                                                                                
                                                                                
               [...]
 | startup.specific-offset.pos                    | Long     | No       | -     
  | Start from the specified binlog file position. **Note, This option is 
required when the `startup.mode` option used `specific`.**                      
                                                                                
                                                                                
                                                                                
                    [...]
diff --git a/docs/en/connector-v2/source/PostgreSQL-CDC.md 
b/docs/en/connector-v2/source/PostgreSQL-CDC.md
index 8197a72b99..d64db14ac5 100644
--- a/docs/en/connector-v2/source/PostgreSQL-CDC.md
+++ b/docs/en/connector-v2/source/PostgreSQL-CDC.md
@@ -93,7 +93,7 @@ ALTER TABLE your_table_name REPLICA IDENTITY FULL;
 | password                                       | String   | Yes      | -     
   | Password to use when connecting to the database server.                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | database-names                                 | List     | No       | -     
   | Database name of the database to monitor.                                  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | table-names                                    | List     | Yes      | -     
   | Table name of the database to monitor. The table name needs to include the 
database name, for example: `database_name.table_name`                          
                                                                                
                                                                                
                                                                                
              [...]
-| table-names-config                             | List     | No       | -     
   | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys":["key1"]}]                                   
                                                                                
                                                                                
                                                                                
                                              [...]
+| table-names-config                             | List     | No       | -     
   | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]    
                                                                                
                                                                                
                                                                                
                                              [...]
 | startup.mode                                   | Enum     | No       | 
INITIAL  | Optional startup mode for PostgreSQL CDC consumer, valid 
enumerations are `initial`, `earliest` and `latest`. <br/> `initial`: 
Synchronize historical data at startup, and then synchronize incremental 
data.<br/> `earliest`: Startup from the earliest offset possible.<br/> 
`latest`: Startup from the latest offset.                                       
                                                          [...]
 | snapshot.split.size                            | Integer  | No       | 8096  
   | The split size (number of rows) of table snapshot, captured tables are 
split into multiple splits when read the snapshot of table.                     
                                                                                
                                                                                
                                                                                
                  [...]
 | snapshot.fetch.size                            | Integer  | No       | 1024  
   | The maximum fetch size for per poll when read table snapshot.              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md 
b/docs/en/connector-v2/source/SqlServer-CDC.md
index a64b3abfa8..11d686b927 100644
--- a/docs/en/connector-v2/source/SqlServer-CDC.md
+++ b/docs/en/connector-v2/source/SqlServer-CDC.md
@@ -63,13 +63,13 @@ describes how to setup the Sql Server CDC connector to run 
SQL queries against S
 
 ## Source Options
 
-|                      Name                      |   Type   | Required | 
Default |                                                                       
                                                                                
                                                                                
                                                              Description       
                                                                                
                    [...]
+|                      Name                      |   Type   | Required | 
Default | Description                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
 
|------------------------------------------------|----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | username                                       | String   | Yes      | -     
  | Name of the database to use when connecting to the database server.         
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | password                                       | String   | Yes      | -     
  | Password to use when connecting to the database server.                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | database-names                                 | List     | Yes      | -     
  | Database name of the database to monitor.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | table-names                                    | List     | Yes      | -     
  | Table name is a combination of schema name and table name 
(databaseName.schemaName.tableName).                                            
                                                                                
                                                                                
                                                                                
                                [...]
-| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys":["key1"]}]                                   
                                                                                
                                                                                
                                                                                
                                               [...]
+| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]    
                                                                                
                                                                                
                                                                                
                                               [...]
 | base-url                                       | String   | Yes      | -     
  | URL has to be with database, like 
"jdbc:sqlserver://localhost:1433;databaseName=test".                            
                                                                                
                                                                                
                                                                                
                                                        [...]
 | startup.mode                                   | Enum     | No       | 
INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations 
are "initial", "earliest", "latest" and "specific".                             
                                                                                
                                                                                
                                                                                
                     [...]
 | startup.timestamp                              | Long     | No       | -     
  | Start from the specified epoch timestamp (in milliseconds).<br/> **Note, 
This option is required when** the **"startup.mode" option used 
`'timestamp'`.**                                                                
                                                                                
                                                                                
                                 [...]
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
index f4a82d2de1..2b441483a2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 import io.debezium.config.Configuration;
 import lombok.Getter;
 
+import java.util.Map;
 import java.util.Properties;
 
 /** A basic Source configuration which is used by {@link IncrementalSource}. */
@@ -34,6 +35,7 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
     @Getter protected final StopConfig stopConfig;
 
     @Getter protected final int splitSize;
+    @Getter protected final Map<String, String> splitColumn;
 
     @Getter protected final double distributionFactorUpper;
     @Getter protected final double distributionFactorLower;
@@ -50,6 +52,7 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
             StartupConfig startupConfig,
             StopConfig stopConfig,
             int splitSize,
+            Map<String, String> splitColumn,
             double distributionFactorUpper,
             double distributionFactorLower,
             int sampleShardingThreshold,
@@ -59,6 +62,7 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
         this.startupConfig = startupConfig;
         this.stopConfig = stopConfig;
         this.splitSize = splitSize;
+        this.splitColumn = splitColumn;
         this.distributionFactorUpper = distributionFactorUpper;
         this.distributionFactorLower = distributionFactorLower;
         this.sampleShardingThreshold = sampleShardingThreshold;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
index 9d46ab3393..ddd47a2b83 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 import io.debezium.relational.RelationalDatabaseConnectorConfig;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -49,6 +50,7 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
             List<String> databaseList,
             List<String> tableList,
             int splitSize,
+            Map<String, String> splitColumn,
             double distributionFactorUpper,
             double distributionFactorLower,
             int sampleShardingThreshold,
@@ -70,6 +72,7 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
                 startupConfig,
                 stopConfig,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index d5d920c257..99ddb3bd17 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -25,7 +25,9 @@ import 
org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import lombok.Setter;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /** A {@link SourceConfig.Factory} to provide {@link SourceConfig} of JDBC 
data source. */
@@ -51,6 +53,7 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
             JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD.defaultValue();
     protected int inverseSamplingRate = 
JdbcSourceOptions.INVERSE_SAMPLING_RATE.defaultValue();
     protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
+    protected Map<String, String> splitColumn;
     protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
     protected String serverTimeZone = 
JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
     protected long connectTimeoutMillis = 
JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
@@ -65,6 +68,11 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         return this;
     }
 
+    public JdbcSourceConfigFactory splitColumn(Map<String, String> 
splitColumn) {
+        this.splitColumn = splitColumn;
+        return this;
+    }
+
     /** Integer port number of the database server. */
     public JdbcSourceConfigFactory port(int port) {
         this.port = port;
@@ -239,6 +247,17 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         this.sampleShardingThreshold = 
config.get(JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD);
         this.inverseSamplingRate = 
config.get(JdbcSourceOptions.INVERSE_SAMPLING_RATE);
         this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
+        this.splitColumn = new HashMap<>();
+        config.getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG)
+                .ifPresent(
+                        jtcs -> {
+                            jtcs.forEach(
+                                    jtc -> {
+                                        this.splitColumn.put(
+                                                jtc.getTable(), 
jtc.getSnapshotSplitColumn());
+                                    });
+                        });
+
         this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
         this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
         this.connectTimeoutMillis = 
config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
index 5cafa363e8..2c55270bd4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
@@ -26,4 +26,5 @@ import java.util.List;
 public class JdbcSourceTableConfig implements Serializable {
     private String table;
     private List<String> primaryKeys;
+    private String snapshotSplitColumn;
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 6cd7ba0631..909e5c2980 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -152,7 +152,8 @@ public class JdbcSourceOptions extends SourceOptions {
                                     + "["
                                     + "   {"
                                     + "       \"table\": 
\"db1.schema1.table1\","
-                                    + "       \"primaryKeys\": 
[\"key1\",\"key2\"]"
+                                    + "       \"primaryKeys\": 
[\"key1\",\"key2\"],"
+                                    + "       \"snapshotSplitColumn\": 
\"key2\""
                                     + "   }"
                                     + "]");
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 87483d9cff..6c83088ef2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -36,7 +36,6 @@ public class SourceOptions {
                     .defaultValue(8096)
                     .withDescription(
                             "The split size (number of rows) of table 
snapshot, captured tables are split into multiple splits when read the snapshot 
of table.");
-
     public static final Option<Integer> SNAPSHOT_FETCH_SIZE =
             Options.key("snapshot.fetch.size")
                     .intType()
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index 60a208de86..05bf5e5d21 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -25,6 +25,8 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils;
 
+import org.apache.commons.lang3.StringUtils;
+
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
 import io.debezium.relational.Table;
@@ -36,9 +38,12 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.math.BigDecimal.ROUND_CEILING;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
@@ -379,12 +384,53 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
     protected Column getSplitColumn(
             JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
             throws SQLException {
-        Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
         Column splitColumn = null;
+        Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+
+        // first , compare user defined split column is in the primary key or 
unique key
+        Map<String, String> splitColumnsConfig = new HashMap<>();
+        try {
+            splitColumnsConfig = sourceConfig.getSplitColumn();
+        } catch (Exception e) {
+            log.error("Config snapshotSplitColumn get exception in {}:{}", 
tableId, e);
+        }
+        String tableSc =
+                splitColumnsConfig.getOrDefault(tableId.catalog() + "." + 
tableId.table(), null);
+
+        if (StringUtils.isNotEmpty(tableSc)) {
+            // Is tableSc(table split column) the unique key
+            AtomicBoolean isUniqueKey = new AtomicBoolean(false);
+            dialect.getUniqueKeys(jdbc, tableId)
+                    .forEach(
+                            ck ->
+                                    ck.getColumnNames()
+                                            .forEach(
+                                                    ckc -> {
+                                                        if 
(tableSc.equals(ckc.getColumnName())) {
+                                                            
isUniqueKey.set(true);
+                                                        }
+                                                    }));
+
+            if (isUniqueKey.get()) {
+                Column column = table.columnWithName(tableSc);
+                if (isEvenlySplitColumn(column)) {
+                    return column;
+                } else {
+                    log.warn(
+                            "Config snapshotSplitColumn type in {} is not 
TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING",
+                            tableId);
+                }
+            } else {
+                log.warn("Config snapshotSplitColumn not unique key for table 
{}", tableId);
+            }
+        } else {
+            log.info("Config snapshotSplitColumn not exists for table {}", 
tableId);
+        }
+
+        Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
         if (primaryKey.isPresent()) {
             List<String> pkColumns = primaryKey.get().getColumnNames();
 
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
             for (String pkColumn : pkColumns) {
                 Column column = table.columnWithName(pkColumn);
                 if (isEvenlySplitColumn(column)) {
@@ -400,7 +446,6 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
 
         List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
         if (!uniqueKeys.isEmpty()) {
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
             for (ConstraintKey uniqueKey : uniqueKeys) {
                 List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
                         uniqueKey.getColumnNames();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
index 19d1124847..86aa894351 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
@@ -25,6 +25,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig;
 import io.debezium.relational.RelationalTableFilters;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -41,6 +42,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
             List<String> databaseList,
             List<String> tableList,
             int splitSize,
+            Map<String, String> splitColumn,
             double distributionFactorUpper,
             double distributionFactorLower,
             int sampleShardingThreshold,
@@ -64,6 +66,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index ba62d72823..ce0b8c802f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -118,6 +118,7 @@ public class MySqlSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
index 32bcb41f78..263bb38d88 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfig.java
@@ -27,6 +27,7 @@ import io.debezium.relational.RelationalTableFilters;
 import lombok.Getter;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -49,6 +50,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
             List<String> databaseList,
             List<String> tableList,
             int splitSize,
+            Map<String, String> splitColumn,
             double distributionFactorUpper,
             double distributionFactorLower,
             int sampleShardingThreshold,
@@ -72,6 +74,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index e2aece6892..f8d6e8e6f7 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -150,6 +150,7 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
index 92ef734566..4f63a2d4fe 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfig.java
@@ -25,6 +25,7 @@ import 
io.debezium.connector.postgresql.PostgresConnectorConfig;
 import io.debezium.relational.RelationalTableFilters;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 public class PostgresSourceConfig extends JdbcSourceConfig {
@@ -36,6 +37,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
             List<String> databaseList,
             List<String> tableList,
             int splitSize,
+            Map<String, String> splitColumn,
             double distributionFactorUpper,
             double distributionFactorLower,
             int sampleShardingThreshold,
@@ -59,6 +61,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
index ebe1cd0a15..66354a751c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/config/PostgresSourceConfigFactory.java
@@ -115,6 +115,7 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
index 7d4062134a..47eaa3d5a0 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfig.java
@@ -25,6 +25,7 @@ import 
io.debezium.connector.sqlserver.SqlServerConnectorConfig;
 import io.debezium.relational.RelationalTableFilters;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -41,6 +42,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
             List<String> databaseList,
             List<String> tableList,
             int splitSize,
+            Map<String, String> splitColumn,
             double distributionFactorUpper,
             double distributionFactorLower,
             int sampleShardingThreshold,
@@ -64,6 +66,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
index b9224653f3..3b3301da49 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/config/SqlServerSourceConfigFactory.java
@@ -83,6 +83,7 @@ public class SqlServerSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 databaseList,
                 tableList,
                 splitSize,
+                splitColumn,
                 distributionFactorUpper,
                 distributionFactorLower,
                 sampleShardingThreshold,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index 1103634162..25d7abca0d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -66,7 +66,8 @@ CREATE TABLE mysql_cdc_e2e_source_table
     `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
     `f_json`               json                           DEFAULT NULL,
     `f_year`               year                           DEFAULT NULL,
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY uniq_key_f (`id`, `f_int`, `f_bigint`) USING BTREE
 ) ENGINE = InnoDB
   AUTO_INCREMENT = 2
   DEFAULT CHARSET = utf8mb4
@@ -116,7 +117,8 @@ CREATE TABLE mysql_cdc_e2e_source_table2
     `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
     `f_json`               json                           DEFAULT NULL,
     `f_year`               year                           DEFAULT NULL,
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY uniq_key_f (`id`, `f_int`, `f_bigint`) USING BTREE
 ) ENGINE = InnoDB
   AUTO_INCREMENT = 2
   DEFAULT CHARSET = utf8mb4
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
index f2b513e5ba..dccb028812 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
@@ -35,6 +35,13 @@ source {
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
 
     snapshot.split.size = 1
+    table-name-config = [
+        {
+            table = "mysql_cdc.mysql_cdc_e2e_source_table"
+            primaryKeys = []
+            snapshotSplitColumn = "f_int"
+        }
+    ]
     snapshot.fetch.size = 1
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
index 6c93ceda10..9c8ca1e796 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
@@ -33,8 +33,19 @@ source {
     password = "mysqlpw"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table", 
"mysql_cdc.mysql_cdc_e2e_source_table2"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
-
     snapshot.split.size = 1
+    table-name-config = [
+        {
+            table = "mysql_cdc.mysql_cdc_e2e_source_table"
+            primaryKeys = []
+            snapshotSplitColumn = "f_bigint"
+        },
+        {
+            table = "mysql_cdc.mysql_cdc_e2e_source_table2"
+            primaryKeys = []
+            snapshotSplitColumn = "f_bigint"
+        }
+    ]
     snapshot.fetch.size = 1
   }
 }

Reply via email to