This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b9a62e5c3f Add date type and float type column split support (#6160)
b9a62e5c3f is described below

commit b9a62e5c3f9e47f2718dcd64a459cc7cfe701eb5
Author: Eric <[email protected]>
AuthorDate: Thu Jan 18 11:04:17 2024 +0800

    Add date type and float type column split support (#6160)
---
 docs/en/connector-v2/source/Jdbc.md                | 151 ++++--
 docs/en/connector-v2/source/Mysql.md               | 143 ++++--
 docs/en/connector-v2/source/Oracle.md              | 160 +++++-
 docs/en/connector-v2/source/PostgreSQL.md          | 159 +++++-
 docs/en/connector-v2/source/SqlServer.md           |  99 +++-
 .../jdbc/internal/dialect/JdbcDialect.java         |   6 +
 .../seatunnel/jdbc/internal/dialect/SQLUtils.java  |   5 +
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |   6 +
 .../internal/dialect/oracle/OracleDialect.java     |   5 +
 .../internal/dialect/psql/PostgresDialect.java     |   4 +
 .../dialect/sqlserver/SqlServerDialect.java        |   5 +
 .../seatunnel/jdbc/source/ChunkSplitter.java       |  19 +-
 .../jdbc/source/DynamicChunkSplitter.java          | 190 +++++--
 .../seatunnel/jdbc/utils/ObjectUtils.java          |  10 +
 .../seatunnel/jdbc/JdbcMysqlSplitIT.java           | 548 +++++++++++++++++++++
 .../engine/server/master/JobMetricsTest.java       |   1 -
 16 files changed, 1329 insertions(+), 182 deletions(-)

diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index e4ae04d043..38ed1f6512 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -82,22 +82,6 @@ The compatible mode of database, required when the database 
supports multiple co
 
 The time in seconds to wait for the database operation used to validate the 
connection to complete.
 
-### partition_column [string]
-
-The column name for parallelism's partition, only support numeric type.
-
-### partition_upper_bound [BigDecimal]
-
-The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.
-
-### partition_lower_bound [BigDecimal]
-
-The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.
-
-### partition_num [int]
-
-The number of partition count, only support positive integer. default value is 
job parallelism
-
 ### fetch_size [int]
 
 For queries that return a large number of objects, you can configure the row 
fetch size used in the query to
@@ -139,34 +123,73 @@ table_list = [
 
 Common row filter conditions for all tables/queries, must start with `where`. 
for example `where id > 100`
 
-### split.size
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables. 
SeaTunnel will use certain rules to split the data in the table, which will be 
handed over to readers for reading. The number of readers is determined by the 
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The 
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table 
and get the Primary Key and Unique Index. If there are more than one column in 
Primary Key and Unique Index, The first column which in the **supported split 
data type** will be used to split data. For example, the table have Primary 
Key(nn guid, name varchar), because `guid` id not in **supported split data 
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits 
when read of table.
 
-The split size (number of rows) of table, captured tables are split into 
multiple splits when read of table.
+#### split.even-distribution.factor.lower-bound
 
-### split.even-distribution.factor.lower-bound
+> Not recommended for use
 
 The lower bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
 
-### split.even-distribution.factor.upper-bound
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
 
 The upper bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
 
-### split.sample-sharding.threshold
+#### split.sample-sharding.threshold
 
 This configuration specifies the threshold of estimated shard count to trigger 
the sample sharding strategy. When the distribution factor is outside the 
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strategy will be used. This can help to handle large datasets 
more efficiently. The default [...]
 
-### split.inverse-sampling.rate
+#### split.inverse-sampling.rate
 
 The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferred. The default value is 1000.
 
-### common options
+#### partition_column [string]
 
-Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of 
split through `split.size`
+
+How many splits do we need to split into, only support positive integer. 
default value is job parallelism.
 
 ## tips
 
-If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed
-in parallel according to the concurrency of tasks.
+> If the table can not be split(for example, table have no Primary Key or 
Unique Index, and `partition_column` is not set), it will run in single 
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to 
read multiple tables, use `table_list`.
 
 ## appendix
 
@@ -197,7 +220,7 @@ there are some reference value for params above.
 
 ## Example
 
-simple:
+### simple
 
 ```
 Jdbc {
@@ -210,7 +233,7 @@ Jdbc {
 }
 ```
 
-parallel:
+### parallel by partition_column
 
 ```
 env {
@@ -226,7 +249,7 @@ source {
         password = "123456"
         query = "select * from type_bin"
         partition_column = "id"
-        partition_num = 10
+        split.size = 10000
         # Read start boundary
         #partition_lower_bound = ...
         # Read end boundary
@@ -239,29 +262,61 @@ sink {
 }
 ```
 
-Using `table_path` read:
+### Parallel Boundary:
 
-***Configuring `table_path` will turn on auto split, you can configure 
`split.*` to adjust the split strategy***
+> It is more efficient to specify the data within the upper and lower bounds 
of the query. It is more efficient to read your data source according to the 
upper and lower boundaries you configured.
 
-```hocon
-Jdbc {
-    url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
-    driver = "com.mysql.cj.jdbc.Driver"
-    connection_check_timeout_sec = 100
-    user = "root"
-    password = "123456"
-  
-    # e.g. table_path = "testdb.table1"、table_path = 
"test_schema.table1"、table_path = "testdb.test_schema.table1"
-    table_path = "testdb.table1"
-    #split.size = 8096
-    #split.even-distribution.factor.upper-bound = 100
-    #split.even-distribution.factor.lower-bound = 0.05
-    #split.sample-sharding.threshold = 1000
-    #split.inverse-sampling.rate = 1000
+```
+source {
+    Jdbc {
+        url = 
"jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+        driver = "com.mysql.cj.jdbc.Driver"
+        connection_check_timeout_sec = 100
+        user = "root"
+        password = "123456"
+        # Define query logic as required
+        query = "select * from type_bin"
+        partition_column = "id"
+        # Read start boundary
+        partition_lower_bound = 1
+        # Read end boundary
+        partition_upper_bound = 500
+        partition_num = 10
+        properties {
+         useSSL=false
+        }
+    }
+}
+```
+
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure 
`split.*` to adjust the split strategy
+
+```
+env {
+  parallelism = 10
+  job.mode = "BATCH"
+}
+source {
+    Jdbc {
+        url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
+        driver = "com.mysql.cj.jdbc.Driver"
+        connection_check_timeout_sec = 100
+        user = "root"
+        password = "123456"
+        table_path = "testdb.table1"
+        query = "select * from testdb.table1"
+        split.size = 10000
+    }
+}
+
+sink {
+  Console {}
 }
 ```
 
-multiple table read:
+### multiple table read:
 
 ***Configuring `table_list` will turn on auto split, you can configure 
`split.*` to adjust the split strategy***
 
@@ -285,7 +340,7 @@ Jdbc {
         }
     ]
     #where_condition= "where id > 100"
-    #split.size = 8096
+    #split.size = 10000
     #split.even-distribution.factor.upper-bound = 100
     #split.even-distribution.factor.lower-bound = 0.05
     #split.sample-sharding.threshold = 1000
diff --git a/docs/en/connector-v2/source/Mysql.md 
b/docs/en/connector-v2/source/Mysql.md
index b1d7938725..e1fe6a3e8e 100644
--- a/docs/en/connector-v2/source/Mysql.md
+++ b/docs/en/connector-v2/source/Mysql.md
@@ -85,9 +85,67 @@ Read external data source data through JDBC.
 | split.inverse-sampling.rate                | Int        | No       | 1000    
        | The inverse of the sampling rate used in the sample sharding 
strategy. For example, if this value is set to 1000, it means a 1/1000 sampling 
rate is applied during the sampling process. This option provides flexibility 
in controlling the granularity of the sampling, thus affecting the final number 
of shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is p [...]
 | common-options                             |            | No       | -       
        | Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                                                                
                    [...]
 
-### Tips
+## Parallel Reader
 
-> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed  in parallel according to the 
concurrency of tasks , When your shard read field is a large number type such 
as bigint(30) and above and the data is not evenly distributed, it is 
recommended to set the parallelism level to 1 to ensure that the data skew 
problem is resolved
+The JDBC Source connector supports parallel reading of data from tables. 
SeaTunnel will use certain rules to split the data in the table, which will be 
handed over to readers for reading. The number of readers is determined by the 
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The 
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table 
and get the Primary Key and Unique Index. If there are more than one column in 
Primary Key and Unique Index, The first column which in the **supported split 
data type** will be used to split data. For example, the table have Primary 
Key(nn guid, name varchar), because `guid` id not in **supported split data 
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits 
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger 
the sample sharding strategy. When the distribution factor is outside the 
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strategy will be used. This can help to handle large datasets 
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of 
split through `split.size`
+
+How many splits do we need to split into, only support positive integer. 
default value is job parallelism.
+
+## tips
+
+> If the table can not be split(for example, table have no Primary Key or 
Unique Index, and `partition_column` is not set), it will run in single 
concurrency.
 >
 > Use `table_path` to replace `query` for single table reading. If you need to 
 > read multiple tables, use `table_list`.
 
@@ -100,7 +158,7 @@ Read external data source data through JDBC.
 ```
 # Defining the runtime environment
 env {
-  parallelism = 2
+  parallelism = 4
   job.mode = "BATCH"
 }
 source{
@@ -124,33 +182,57 @@ sink {
 }
 ```
 
-### Parallel:
-
-> Read your query table in parallel with the shard field you configured and 
the shard data  You can do this if you want to read the whole table
+### parallel by partition_column
 
 ```
 env {
-  parallelism = 10
+  parallelism = 4
   job.mode = "BATCH"
 }
 source {
     Jdbc {
-        url = 
"jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+        url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
         driver = "com.mysql.cj.jdbc.Driver"
         connection_check_timeout_sec = 100
         user = "root"
         password = "123456"
-        # Define query logic as required
         query = "select * from type_bin"
-        # Parallel sharding reads fields
         partition_column = "id"
-        # Number of fragments
-        partition_num = 10
-        properties {
-         useSSL=false
-        }
+        split.size = 10000
+        # Read start boundary
+        #partition_lower_bound = ...
+        # Read end boundary
+        #partition_upper_bound = ...
     }
 }
+
+sink {
+  Console {}
+}
+```
+
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure 
`split.*` to adjust the split strategy
+
+```
+env {
+  parallelism = 4
+  job.mode = "BATCH"
+}
+source {
+    Jdbc {
+        url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
+        driver = "com.mysql.cj.jdbc.Driver"
+        connection_check_timeout_sec = 100
+        user = "root"
+        password = "123456"
+        table_path = "testdb.table1"
+        query = "select * from testdb.table1"
+        split.size = 10000
+    }
+}
+
 sink {
   Console {}
 }
@@ -183,36 +265,6 @@ source {
 }
 ```
 
-### Using `table_path` read:
-
-***Configuring `table_path` will turn on auto split, you can configure 
`split.*` to adjust the split strategy***
-
-```hocon
-env {
-  job.mode = "BATCH"
-}
-source {
-  Jdbc {
-    url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
-    driver = "com.mysql.cj.jdbc.Driver"
-    connection_check_timeout_sec = 100
-    user = "root"
-    password = "123456"
-
-    table_path = "testdb.table1"
-    #split.size = 8096
-    #split.even-distribution.factor.upper-bound = 100
-    #split.even-distribution.factor.lower-bound = 0.05
-    #split.sample-sharding.threshold = 1000
-    #split.inverse-sampling.rate = 1000
-  }
-}
-
-sink {
-  Console {}
-}
-```
-
 ### Multiple table read:
 
 ***Configuring `table_list` will turn on auto split, you can configure 
`split.*` to adjust the split strategy***
@@ -220,6 +272,7 @@ sink {
 ```hocon
 env {
   job.mode = "BATCH"
+  parallelism = 4
 }
 source {
   Jdbc {
diff --git a/docs/en/connector-v2/source/Oracle.md 
b/docs/en/connector-v2/source/Oracle.md
index ec1eddc5cd..46d1761967 100644
--- a/docs/en/connector-v2/source/Oracle.md
+++ b/docs/en/connector-v2/source/Oracle.md
@@ -68,11 +68,94 @@ Read external data source data through JDBC.
 | partition_num                | Int        | No       | job parallelism | The 
number of partition count, only support positive integer. default value is job 
parallelism                                                                     
                                                                                
               |
 | fetch_size                   | Int        | No       | 0               | For 
queries that return a large number of objects,you can configure<br/> the row 
fetch size used in the query toimprove performance by<br/> reducing the number 
database hits required to satisfy the selection criteria.<br/> Zero means use 
jdbc default value. |
 | properties                   | Map        | No       | -               | 
Additional connection configuration parameters,when properties and URL have the 
same parameters, the priority is determined by the <br/>specific implementation 
of the driver. For example, in MySQL, properties take precedence over the URL.  
                  |
-| common-options               |            | No       | -               | 
Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                  |
 
-### Tips
+|                    Name                    |    Type    | Required |     
Default     |                                                                   
                                                                                
                                                                                
                                                                  Description   
                                                                                
                  [...]
+|--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| url                                        | String     | Yes      | -       
        | The URL of the JDBC connection. Refer to a case: 
jdbc:mysql://localhost:3306:3306/test                                           
                                                                                
                                                                                
                                                                                
                                   [...]
+| driver                                     | String     | Yes      | -       
        | The jdbc class name used to connect to the remote data source,<br/> 
if you use MySQL the value is `com.mysql.cj.jdbc.Driver`.                       
                                                                                
                                                                                
                                                                                
                [...]
+| user                                       | String     | No       | -       
        | Connection instance user name                                         
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| password                                   | String     | No       | -       
        | Connection instance password                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| query                                      | String     | Yes      | -       
        | Query statement                                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| connection_check_timeout_sec               | Int        | No       | 30      
        | The time in seconds to wait for the database operation used to 
validate the connection to complete                                             
                                                                                
                                                                                
                                                                                
                     [...]
+| partition_column                           | String     | No       | -       
        | The column name for parallelism's partition, only support numeric 
type,Only support numeric type primary key, and only can config one column.     
                                                                                
                                                                                
                                                                                
                  [...]
+| partition_lower_bound                      | BigDecimal | No       | -       
        | The partition_column min value for scan, if not set SeaTunnel will 
query database get min value.                                                   
                                                                                
                                                                                
                                                                                
                 [...]
+| partition_upper_bound                      | BigDecimal | No       | -       
        | The partition_column max value for scan, if not set SeaTunnel will 
query database get max value.                                                   
                                                                                
                                                                                
                                                                                
                 [...]
+| partition_num                              | Int        | No       | job 
parallelism | The number of partition count, only support positive integer. 
default value is job parallelism                                                
                                                                                
                                                                                
                                                                                
                      [...]
+| fetch_size                                 | Int        | No       | 0       
        | For queries that return a large number of objects,you can 
configure<br/> the row fetch size used in the query toimprove performance 
by<br/> reducing the number database hits required to satisfy the selection 
criteria.<br/> Zero means use jdbc default value.                               
                                                                                
                                    [...]
+| properties                                 | Map        | No       | -       
        | Additional connection configuration parameters,when properties and 
URL have the same parameters, the priority is determined by the <br/>specific 
implementation of the driver. For example, in MySQL, properties take precedence 
over the URL.                                                                   
                                                                                
                   [...]
+| table_path                                 | Int        | No       | 0       
        | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: 
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" 
<br/>postgresql: "testdb.test_schema.table1"                                    
                                                                                
                                    [...]
+| table_list                                 | Array      | No       | 0       
        | The list of tables to be read, you can use this configuration instead 
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                  [...]
+| where_condition                            | String     | No       | -       
        | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
               [...]
+| split.size                                 | Int        | No       | 8096    
        | The split size (number of rows) of table, captured tables are split 
into multiple splits when read of table.                                        
                                                                                
                                                                                
                                                                                
                [...]
+| split.even-distribution.factor.lower-bound | Double     | No       | 0.05    
        | The lower bound of the chunk key distribution factor. This factor is 
used to determine whether the table data is evenly distributed. If the 
distribution factor is calculated to be greater than or equal to this lower 
bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be 
optimized for even distribution. Otherwise, if the distribution factor is less, 
the table will be considered a [...]
+| split.even-distribution.factor.upper-bound | Double     | No       | 100     
        | The upper bound of the chunk key distribution factor. This factor is 
used to determine whether the table data is evenly distributed. If the 
distribution factor is calculated to be less than or equal to this upper bound 
(i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be 
optimized for even distribution. Otherwise, if the distribution factor is 
greater, the table will be considered a [...]
+| split.sample-sharding.threshold            | Int        | No       | 10000   
        | This configuration specifies the threshold of estimated shard count 
to trigger the sample sharding strategy. When the distribution factor is 
outside the bounds specified by 
`chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding st [...]
+| split.inverse-sampling.rate                | Int        | No       | 1000    
        | The inverse of the sampling rate used in the sample sharding 
strategy. For example, if this value is set to 1000, it means a 1/1000 sampling 
rate is applied during the sampling process. This option provides flexibility 
in controlling the granularity of the sampling, thus affecting the final number 
of shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is p [...]
+| common-options                             |            | No       | -       
        | Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                                                                
                    [...]
 
-> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed  in parallel according to the 
concurrency of tasks.
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables. 
SeaTunnel will use certain rules to split the data in the table, which will be 
handed over to readers for reading. The number of readers is determined by the 
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The 
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table 
and get the Primary Key and Unique Index. If there are more than one column in 
Primary Key and Unique Index, The first column which in the **supported split 
data type** will be used to split data. For example, the table have Primary 
Key(nn guid, name varchar), because `guid` id not in **supported split data 
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits 
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger 
the sample sharding strategy. When the distribution factor is outside the 
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strategy will be used. This can help to handle large datasets 
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of 
split through `split.size`
+
+How many splits do we need to split into, only support positive integer. 
default value is job parallelism.
+
+## tips
+
+> If the table can not be split(for example, table have no Primary Key or 
Unique Index, and `partition_column` is not set), it will run in single 
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to 
read multiple tables, use `table_list`.
 
 ## Task Example
 
@@ -83,7 +166,7 @@ Read external data source data through JDBC.
 ```
 # Defining the runtime environment
 env {
-  parallelism = 2
+  parallelism = 4
   job.mode = "BATCH"
 }
 source{
@@ -106,13 +189,13 @@ sink {
 }
 ```
 
-### Parallel:
+### parallel by partition_column
 
 > Read your query table in parallel with the shard field you configured and 
 > the shard data  You can do this if you want to read the whole table
 
 ```
 env {
-  parallelism = 10
+  parallelism = 4
   job.mode = "BATCH"
 }
 source {
@@ -138,6 +221,33 @@ sink {
 }
 ```
 
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure 
`split.*` to adjust the split strategy
+
+```
+env {
+  parallelism = 4
+  job.mode = "BATCH"
+}
+source {
+    Jdbc {
+        url = "jdbc:oracle:thin:@datasource01:1523:xe"
+        driver = "oracle.jdbc.OracleDriver"
+        connection_check_timeout_sec = 100
+        user = "root"
+        password = "123456"
+        table_path = "DA.SCHEMA1.TABLE1"
+        query = "select * from SCHEMA1.TABLE1"
+        split.size = 10000
+    }
+}
+
+sink {
+  Console {}
+}
+```
+
 ### Parallel Boundary:
 
 > It is more efficient to specify the data within the upper and lower bounds 
 > of the query It is more efficient to read your data source according to the 
 > upper and lower boundaries you configured
@@ -162,3 +272,41 @@ source {
 }
 ```
 
+### Multiple table read:
+
+***Configuring `table_list` will turn on auto split, you can configure 
`split.*` to adjust the split strategy***
+
+```hocon
+env {
+  job.mode = "BATCH"
+  parallelism = 4
+}
+source {
+  Jdbc {
+    url = "jdbc:oracle:thin:@datasource01:1523:xe"
+    driver = "oracle.jdbc.OracleDriver"
+    connection_check_timeout_sec = 100
+    user = "root"
+    password = "123456"
+    "table_list"=[
+        {
+            "table_path"="XE.TEST.USER_INFO"
+        },
+        {
+            "table_path"="XE.TEST.YOURTABLENAME"
+        }
+    ]
+    #where_condition= "where id > 100"
+    split.size = 10000
+    #split.even-distribution.factor.upper-bound = 100
+    #split.even-distribution.factor.lower-bound = 0.05
+    #split.sample-sharding.threshold = 1000
+    #split.inverse-sampling.rate = 1000
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
diff --git a/docs/en/connector-v2/source/PostgreSQL.md 
b/docs/en/connector-v2/source/PostgreSQL.md
index a29b9b102c..e991f22c1f 100644
--- a/docs/en/connector-v2/source/PostgreSQL.md
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -77,11 +77,94 @@ Read external data source data through JDBC.
 | partition_num                | Int        | No       | job parallelism | The 
number of partition count, only support positive integer. default value is job 
parallelism                                                                     
                                                                                
               |
 | fetch_size                   | Int        | No       | 0               | For 
queries that return a large number of objects,you can configure<br/> the row 
fetch size used in the query toimprove performance by<br/> reducing the number 
database hits required to satisfy the selection criteria.<br/> Zero means use 
jdbc default value. |
 | properties                   | Map        | No       | -               | 
Additional connection configuration parameters,when properties and URL have the 
same parameters, the priority is determined by the <br/>specific implementation 
of the driver. For example, in MySQL, properties take precedence over the URL.  
                  |
-| common-options               |            | No       | -               | 
Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                  |
 
-### Tips
+|                    Name                    |    Type    | Required |     
Default     |                                                                   
                                                                                
                                                                                
                                                                  Description   
                                                                                
                  [...]
+|--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| url                                        | String     | Yes      | -       
        | The URL of the JDBC connection. Refer to a case: 
jdbc:mysql://localhost:3306:3306/test                                           
                                                                                
                                                                                
                                                                                
                                   [...]
+| driver                                     | String     | Yes      | -       
        | The jdbc class name used to connect to the remote data source,<br/> 
if you use MySQL the value is `com.mysql.cj.jdbc.Driver`.                       
                                                                                
                                                                                
                                                                                
                [...]
+| user                                       | String     | No       | -       
        | Connection instance user name                                         
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| password                                   | String     | No       | -       
        | Connection instance password                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| query                                      | String     | Yes      | -       
        | Query statement                                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| connection_check_timeout_sec               | Int        | No       | 30      
        | The time in seconds to wait for the database operation used to 
validate the connection to complete                                             
                                                                                
                                                                                
                                                                                
                     [...]
+| partition_column                           | String     | No       | -       
        | The column name for parallelism's partition, only support numeric 
type,Only support numeric type primary key, and only can config one column.     
                                                                                
                                                                                
                                                                                
                  [...]
+| partition_lower_bound                      | BigDecimal | No       | -       
        | The partition_column min value for scan, if not set SeaTunnel will 
query database get min value.                                                   
                                                                                
                                                                                
                                                                                
                 [...]
+| partition_upper_bound                      | BigDecimal | No       | -       
        | The partition_column max value for scan, if not set SeaTunnel will 
query database get max value.                                                   
                                                                                
                                                                                
                                                                                
                 [...]
+| partition_num                              | Int        | No       | job 
parallelism | The number of partition count, only support positive integer. 
default value is job parallelism                                                
                                                                                
                                                                                
                                                                                
                      [...]
+| fetch_size                                 | Int        | No       | 0       
        | For queries that return a large number of objects,you can 
configure<br/> the row fetch size used in the query toimprove performance 
by<br/> reducing the number database hits required to satisfy the selection 
criteria.<br/> Zero means use jdbc default value.                               
                                                                                
                                    [...]
+| properties                                 | Map        | No       | -       
        | Additional connection configuration parameters,when properties and 
URL have the same parameters, the priority is determined by the <br/>specific 
implementation of the driver. For example, in MySQL, properties take precedence 
over the URL.                                                                   
                                                                                
                   [...]
+| table_path                                 | Int        | No       | 0       
        | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: 
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" 
<br/>postgresql: "testdb.test_schema.table1"                                    
                                                                                
                                    [...]
+| table_list                                 | Array      | No       | 0       
        | The list of tables to be read, you can use this configuration instead 
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                  [...]
+| where_condition                            | String     | No       | -       
        | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
               [...]
+| split.size                                 | Int        | No       | 8096    
        | The split size (number of rows) of table, captured tables are split 
into multiple splits when read of table.                                        
                                                                                
                                                                                
                                                                                
                [...]
+| split.even-distribution.factor.lower-bound | Double     | No       | 0.05    
        | The lower bound of the chunk key distribution factor. This factor is 
used to determine whether the table data is evenly distributed. If the 
distribution factor is calculated to be greater than or equal to this lower 
bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be 
optimized for even distribution. Otherwise, if the distribution factor is less, 
the table will be considered a [...]
+| split.even-distribution.factor.upper-bound | Double     | No       | 100     
        | The upper bound of the chunk key distribution factor. This factor is 
used to determine whether the table data is evenly distributed. If the 
distribution factor is calculated to be less than or equal to this upper bound 
(i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be 
optimized for even distribution. Otherwise, if the distribution factor is 
greater, the table will be considered a [...]
+| split.sample-sharding.threshold            | Int        | No       | 10000   
        | This configuration specifies the threshold of estimated shard count 
to trigger the sample sharding strategy. When the distribution factor is 
outside the bounds specified by 
`chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding st [...]
+| split.inverse-sampling.rate                | Int        | No       | 1000    
        | The inverse of the sampling rate used in the sample sharding 
strategy. For example, if this value is set to 1000, it means a 1/1000 sampling 
rate is applied during the sampling process. This option provides flexibility 
in controlling the granularity of the sampling, thus affecting the final number 
of shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is p [...]
+| common-options                             |            | No       | -       
        | Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                                                                
                    [...]
 
-> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed  in parallel according to the 
concurrency of tasks.
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables. 
SeaTunnel will use certain rules to split the data in the table, which will be 
handed over to readers for reading. The number of readers is determined by the 
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The 
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table 
and get the Primary Key and Unique Index. If there are more than one column in 
Primary Key and Unique Index, The first column which in the **supported split 
data type** will be used to split data. For example, the table have Primary 
Key(nn guid, name varchar), because `guid` id not in **supported split data 
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits 
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger 
the sample sharding strategy. When the distribution factor is outside the 
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strategy will be used. This can help to handle large datasets 
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of 
split through `split.size`
+
+How many splits do we need to split into, only support positive integer. 
default value is job parallelism.
+
+## tips
+
+> If the table can not be split(for example, table have no Primary Key or 
Unique Index, and `partition_column` is not set), it will run in single 
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to 
read multiple tables, use `table_list`.
 
 ## Task Example
 
@@ -92,7 +175,7 @@ Read external data source data through JDBC.
 ```
 # Defining the runtime environment
 env {
-  parallelism = 2
+  parallelism = 4
   job.mode = "BATCH"
 }
 
@@ -115,13 +198,13 @@ sink {
 }
 ```
 
-### Parallel:
+### parallel by partition_column
 
 > Read your query table in parallel with the shard field you configured and 
 > the shard data  You can do this if you want to read the whole table
 
 ```
 env {
-  parallelism = 10
+  parallelism = 4
   job.mode = "BATCH"
 }
 source{
@@ -140,6 +223,33 @@ sink {
 }
 ```
 
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure 
`split.*` to adjust the split strategy
+
+```
+env {
+  parallelism = 4
+  job.mode = "BATCH"
+}
+source {
+    Jdbc {
+        url = "jdbc:postgresql://localhost:5432/test"
+        driver = "org.postgresql.Driver"
+        connection_check_timeout_sec = 100
+        user = "root"
+        password = "123456"
+        table_path = "test.public.AllDataType_1"
+        query = "select * from public.AllDataType_1"
+        split.size = 10000
+    }
+}
+
+sink {
+  Console {}
+}
+```
+
 ### Parallel Boundary:
 
 > It is more efficient to specify the data within the upper and lower bounds 
 > of the query It is more efficient to read your data source according to the 
 > upper and lower boundaries you configured
@@ -163,3 +273,40 @@ source{
 }
 ```
 
+### Multiple table read:
+
+***Configuring `table_list` will turn on auto split, you can configure 
`split.*` to adjust the split strategy***
+
+```hocon
+env {
+  job.mode = "BATCH"
+  parallelism = 4
+}
+source {
+  Jdbc {
+    url="jdbc:postgresql://datasource01:5432/demo"
+    user="iDm82k6Q0Tq+wUprWnPsLQ=="
+    driver="org.postgresql.Driver"
+    password="iDm82k6Q0Tq+wUprWnPsLQ=="
+    "table_list"=[
+        {
+            "table_path"="demo.public.AllDataType_1"
+        },
+        {
+            "table_path"="demo.public.alldatatype"
+        }
+    ]
+    #where_condition= "where id > 100"
+    split.size = 10000
+    #split.even-distribution.factor.upper-bound = 100
+    #split.even-distribution.factor.lower-bound = 0.05
+    #split.sample-sharding.threshold = 1000
+    #split.inverse-sampling.rate = 1000
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
diff --git a/docs/en/connector-v2/source/SqlServer.md 
b/docs/en/connector-v2/source/SqlServer.md
index 01f49072b5..0cc6a0dacd 100644
--- a/docs/en/connector-v2/source/SqlServer.md
+++ b/docs/en/connector-v2/source/SqlServer.md
@@ -57,24 +57,93 @@ Read external data source data through JDBC.
 
 ## Source Options
 
-|             name             |  type  | required |     default     |         
                                                                                
                                   Description                                  
                                                                                
          |
-|------------------------------|--------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url                          | String | Yes      | -               | The URL 
of the JDBC connection. Refer to a case: 
jdbc:sqlserver://127.0.0.1:1434;database=TestDB                                 
                                                                                
                                                 |
-| driver                       | String | Yes      | -               | The 
jdbc class name used to connect to the remote data source,<br/> if you use 
SQLserver the value is `com.microsoft.sqlserver.jdbc.SQLServerDriver`.          
                                                                                
                   |
-| user                         | String | No       | -               | 
Connection instance user name                                                   
                                                                                
                                                                                
                  |
-| password                     | String | No       | -               | 
Connection instance password                                                    
                                                                                
                                                                                
                  |
-| query                        | String | Yes      | -               | Query 
statement                                                                       
                                                                                
                                                                                
            |
-| connection_check_timeout_sec | Int    | No       | 30              | The 
time in seconds to wait for the database operation used to validate the 
connection to complete                                                          
                                                                                
                      |
-| partition_column             | String | No       | -               | The 
column name for parallelism's partition, only support numeric type.             
                                                                                
                                                                                
              |
-| partition_lower_bound        | Long   | No       | -               | The 
partition_column min value for scan, if not set SeaTunnel will query database 
get min value.                                                                  
                                                                                
                |
-| partition_upper_bound        | Long   | No       | -               | The 
partition_column max value for scan, if not set SeaTunnel will query database 
get max value.                                                                  
                                                                                
                |
-| partition_num                | Int    | No       | job parallelism | The 
number of partition count, only support positive integer. default value is job 
parallelism                                                                     
                                                                                
               |
-| fetch_size                   | Int    | No       | 0               | For 
queries that return a large number of objects,you can configure<br/> the row 
fetch size used in the query toimprove performance by<br/> reducing the number 
database hits required to satisfy the selection criteria.<br/> Zero means use 
jdbc default value. |
-| common-options               |        | No       | -               | Source 
plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                  |
+|                    name                    |  type  | required |     default 
    |                                                                           
                                                                                
                                                                                
                                                          Description           
                                                                                
              [...]
+|--------------------------------------------|--------|----------|-----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| url                                        | String | Yes      | -           
    | The URL of the JDBC connection. Refer to a case: 
jdbc:sqlserver://127.0.0.1:1434;database=TestDB                                 
                                                                                
                                                                                
                                                                                
                                       [...]
+| driver                                     | String | Yes      | -           
    | The jdbc class name used to connect to the remote data source,<br/> if 
you use SQLserver the value is `com.microsoft.sqlserver.jdbc.SQLServerDriver`.  
                                                                                
                                                                                
                                                                                
                 [...]
+| user                                       | String | No       | -           
    | Connection instance user name                                             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| password                                   | String | No       | -           
    | Connection instance password                                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| query                                      | String | Yes      | -           
    | Query statement                                                           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| connection_check_timeout_sec               | Int    | No       | 30          
    | The time in seconds to wait for the database operation used to validate 
the connection to complete                                                      
                                                                                
                                                                                
                                                                                
                [...]
+| partition_column                           | String | No       | -           
    | The column name for parallelism's partition, only support numeric type.   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| partition_lower_bound                      | Long   | No       | -           
    | The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.                                                         
                                                                                
                                                                                
                                                                                
               [...]
+| partition_upper_bound                      | Long   | No       | -           
    | The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.                                                         
                                                                                
                                                                                
                                                                                
               [...]
+| partition_num                              | Int    | No       | job 
parallelism | The number of partition count, only support positive integer. 
default value is job parallelism                                                
                                                                                
                                                                                
                                                                                
                          [...]
+| fetch_size                                 | Int    | No       | 0           
    | For queries that return a large number of objects,you can configure<br/> 
the row fetch size used in the query toimprove performance by<br/> reducing the 
number database hits required to satisfy the selection criteria.<br/> Zero 
means use jdbc default value.                                                   
                                                                                
                    [...]
+| properties                                 | Map    | No       | -           
    | Additional connection configuration parameters,when properties and URL 
have the same parameters, the priority is determined by the <br/>specific 
implementation of the driver. For example, in MySQL, properties take precedence 
over the URL.                                                                   
                                                                                
                       [...]
+| table_path                                 | Int    | No       | 0           
    | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: 
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" 
<br/>postgresql: "testdb.test_schema.table1"                                    
                                                                                
                                        [...]
+| table_list                                 | Array  | No       | 0           
    | The list of tables to be read, you can use this configuration instead of 
`table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                      [...]
+| where_condition                            | String | No       | -           
    | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
                   [...]
+| split.size                                 | Int    | No       | 8096        
    | The split size (number of rows) of table, captured tables are split into 
multiple splits when read of table.                                             
                                                                                
                                                                                
                                                                                
               [...]
+| split.even-distribution.factor.lower-bound | Double | No       | 0.05        
    | The lower bound of the chunk key distribution factor. This factor is used 
to determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as un [...]
+| split.even-distribution.factor.upper-bound | Double | No       | 100         
    | The upper bound of the chunk key distribution factor. This factor is used 
to determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as un [...]
+| split.sample-sharding.threshold            | Int    | No       | 10000       
    | This configuration specifies the threshold of estimated shard count to 
trigger the sample sharding strategy. When the distribution factor is outside 
the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strate [...]
+| split.inverse-sampling.rate                | Int    | No       | 1000        
    | The inverse of the sampling rate used in the sample sharding strategy. 
For example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is prefe [...]
+| common-options                             |        | No       | -           
    | Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                                                                
                        [...]
+
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables. 
SeaTunnel will use certain rules to split the data in the table, which will be 
handed over to readers for reading. The number of readers is determined by the 
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The 
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table 
and get the Primary Key and Unique Index. If there are more than one column in 
Primary Key and Unique Index, The first column which in the **supported split 
data type** will be used to split data. For example, the table have Primary 
Key(nn guid, name varchar), because `guid` id not in **supported split data 
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits 
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger 
the sample sharding strategy. When the distribution factor is outside the 
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strategy will be used. This can help to handle large datasets 
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of 
split through `split.size`
+
+How many splits do we need to split into, only support positive integer. 
default value is job parallelism.
 
 ## tips
 
-> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed  in parallel according to the 
concurrency of tasks.
+> If the table can not be split(for example, table have no Primary Key or 
Unique Index, and `partition_column` is not set), it will run in single 
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to 
read multiple tables, use `table_list`.
 
 ## Task Example
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 099d59d5c0..1562ac3b48 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -27,6 +27,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -50,6 +53,8 @@ import static java.lang.String.format;
  */
 public interface JdbcDialect extends Serializable {
 
+    Logger log = LoggerFactory.getLogger(JdbcDialect.class.getName());
+
     /**
      * Get the name of jdbc dialect.
      *
@@ -316,6 +321,7 @@ public interface JdbcDialect extends Serializable {
 
         try (Statement stmt = connection.createStatement()) {
             stmt.setFetchSize(1024);
+            log.info(String.format("Split Chunk, approximateRowCntStatement: 
%s", sampleQuery));
             try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
                 int count = 0;
                 List<Object> results = new ArrayList<>();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
index 2686a4e307..cc841149c9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
@@ -17,16 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+@Slf4j
 public class SQLUtils {
 
     public static Long countForSubquery(Connection connection, String 
subQuerySQL)
             throws SQLException {
         String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) T", 
subQuerySQL);
+        log.info("Split Chunk, countForSubquery: {}", sqlQuery);
         try (Statement stmt = connection.createStatement()) {
             try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
                 if (resultSet.next()) {
@@ -40,6 +44,7 @@ public class SQLUtils {
 
     public static Long countForTable(Connection connection, String tablePath) 
throws SQLException {
         String sqlQuery = String.format("SELECT COUNT(*) FROM %s", tablePath);
+        log.info("Split Chunk, countForTable: {}", sqlQuery);
         try (Statement stmt = connection.createStatement()) {
             try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
                 if (resultSet.next()) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index f2ce15e31e..ec53dd0d6c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -28,6 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -41,6 +43,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class MysqlDialect implements JdbcDialect {
     public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
 
@@ -170,8 +173,11 @@ public class MysqlDialect implements JdbcDialect {
                     String.format("USE %s;", 
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
             String rowCountQuery =
                     String.format("SHOW TABLE STATUS LIKE '%s';", 
tablePath.getTableName());
+
             try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
useDatabaseStatement);
                 stmt.execute(useDatabaseStatement);
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index a1a658ba06..9167fe07f4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -28,6 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -38,6 +40,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class OracleDialect implements JdbcDialect {
 
     private static final int DEFAULT_ORACLE_FETCH_SIZE = 128;
@@ -188,7 +191,9 @@ public class OracleDialect implements JdbcDialect {
                             tablePath.getSchemaName(), 
tablePath.getTableName());
 
             try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
analyzeTable);
                 stmt.execute(analyzeTable);
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next()) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 9a4076fe2a..e73a8f0bb6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -28,6 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -37,6 +39,7 @@ import java.util.Arrays;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class PostgresDialect implements JdbcDialect {
 
     public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
@@ -152,6 +155,7 @@ public class PostgresDialect implements JdbcDialect {
                             "SELECT reltuples FROM pg_class r WHERE relkind = 
'r' AND relname = '%s';",
                             table.getTablePath().getTableName());
             try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next()) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index f8a46a0637..729ba17949 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -28,6 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -38,6 +40,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class SqlServerDialect implements JdbcDialect {
 
     public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
@@ -165,6 +168,7 @@ public class SqlServerDialect implements JdbcDialect {
                             String.format(
                                     "USE %s;",
                                     
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
+                    log.info("Split Chunk, approximateRowCntStatement: {}", 
useDatabaseStatement);
                     stmt.execute(useDatabaseStatement);
                 }
                 String rowCountQuery =
@@ -172,6 +176,7 @@ public class SqlServerDialect implements JdbcDialect {
                                 "SELECT Total_Rows = SUM(st.row_count) FROM 
sys"
                                         + ".dm_db_partition_stats st WHERE 
object_name(object_id) = '%s' AND index_id < 2;",
                                 tablePath.getTableName());
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next()) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index ed7fac4d26..89ec64b817 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -221,6 +221,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
                             jdbcDialect.tableIdentifier(table.getTablePath()));
         }
         try (Statement stmt = getOrEstablishConnection().createStatement()) {
+            log.info("Split table, query min max: {}", sqlQuery);
             try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
                 if (resultSet.next()) {
                     Object min = resultSet.getObject(1);
@@ -251,7 +252,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
                                 "Partitioned column(%s) don't exist in the 
table columns",
                                 partitionColumn));
             }
-            if (!isEvenlySplitColumn(column)) {
+            if (!isSupportSplitColumn(column)) {
                 throw new JdbcConnectorException(
                         CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                         String.format("%s is not numeric/string type", 
partitionColumn));
@@ -266,7 +267,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
         if (pk != null) {
             for (String pkField : pk.getColumnNames()) {
                 Column column = columnMap.get(pkField);
-                if (isEvenlySplitColumn(column)) {
+                if (isSupportSplitColumn(column)) {
                     return Optional.of(
                             new SeaTunnelRowType(
                                     new String[] {pkField},
@@ -290,7 +291,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
                             uniqueKey.getColumnNames()) {
                         String uniqueKeyColumnName = 
uniqueKeyColumn.getColumnName();
                         Column column = columnMap.get(uniqueKeyColumnName);
-                        if (isEvenlySplitColumn(column)) {
+                        if (isSupportSplitColumn(column)) {
                             return Optional.of(
                                     new SeaTunnelRowType(
                                             new String[] {uniqueKeyColumnName},
@@ -305,19 +306,19 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
         return Optional.empty();
     }
 
-    protected boolean isEvenlySplitColumn(Column splitColumn) {
-        return isEvenlySplitColumn(splitColumn.getDataType());
-    }
-
-    protected boolean isEvenlySplitColumn(SeaTunnelDataType columnType) {
+    protected boolean isSupportSplitColumn(Column splitColumn) {
+        SeaTunnelDataType<?> dataType = splitColumn.getDataType();
         // currently, we only support these types.
-        switch (columnType.getSqlType()) {
+        switch (dataType.getSqlType()) {
             case TINYINT:
             case SMALLINT:
             case INT:
             case BIGINT:
+            case DOUBLE:
+            case FLOAT:
             case DECIMAL:
             case STRING:
+            case DATE:
                 return true;
             default:
                 return false;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index da158dca44..62cc173702 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ObjectUtils;
 
@@ -32,8 +33,11 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -97,7 +101,6 @@ public class DynamicChunkSplitter extends ChunkSplitter {
     private List<ChunkRange> splitTableIntoChunks(
             JdbcSourceTable table, String splitColumnName, SeaTunnelDataType 
splitColumnType)
             throws SQLException {
-        TablePath tablePath = table.getTablePath();
         Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
         Object min = minMax.getLeft();
         Object max = minMax.getRight();
@@ -107,6 +110,29 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         }
 
         int chunkSize = config.getSplitSize();
+
+        switch (splitColumnType.getSqlType()) {
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case DECIMAL:
+            case DOUBLE:
+            case FLOAT:
+            case STRING:
+                return evenlyColumnSplitChunks(table, splitColumnName, min, 
max, chunkSize);
+            case DATE:
+                return dateColumnSplitChunks(table, splitColumnName, min, max, 
chunkSize);
+            default:
+                throw CommonError.unsupportedDataType(
+                        "JDBC", splitColumnType.getSqlType().toString(), 
splitColumnName);
+        }
+    }
+
+    private List<ChunkRange> evenlyColumnSplitChunks(
+            JdbcSourceTable table, String splitColumnName, Object min, Object 
max, int chunkSize)
+            throws SQLException {
+        TablePath tablePath = table.getTablePath();
         double distributionFactorUpper = 
config.getSplitEvenDistributionFactorUpperBound();
         double distributionFactorLower = 
config.getSplitEvenDistributionFactorLowerBound();
         int sampleShardingThreshold = config.getSplitSampleShardingThreshold();
@@ -123,59 +149,54 @@ public class DynamicChunkSplitter extends ChunkSplitter {
                 distributionFactorLower,
                 sampleShardingThreshold);
 
-        if (isEvenlySplitColumn(splitColumnType)) {
-            long approximateRowCnt = queryApproximateRowCnt(table);
-            double distributionFactor =
-                    calculateDistributionFactor(tablePath, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorLower) >= 0
-                            && ObjectUtils.doubleCompare(
-                                            distributionFactor, 
distributionFactorUpper)
-                                    <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tablePath, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                int shardCount = (int) (approximateRowCnt / chunkSize);
-                int inverseSamplingRate = config.getSplitInverseSamplingRate();
-                if (sampleShardingThreshold < shardCount) {
-                    // It is necessary to ensure that the number of data rows 
sampled by the
-                    // sampling rate is greater than the number of shards.
-                    // Otherwise, if the sampling rate is too low, it may 
result in an insufficient
-                    // number of data rows for the shards, leading to an 
inadequate number of
-                    // shards.
-                    // Therefore, inverseSamplingRate should be less than 
chunkSize
-                    if (inverseSamplingRate > chunkSize) {
-                        log.warn(
-                                "The inverseSamplingRate is {}, which is 
greater than chunkSize {}, so we set inverseSamplingRate to chunkSize",
-                                inverseSamplingRate,
-                                chunkSize);
-                        inverseSamplingRate = chunkSize;
-                    }
-                    log.info(
-                            "Use sampling sharding for table {}, the sampling 
rate is {}",
-                            tablePath,
-                            inverseSamplingRate);
-                    Object[] sample =
-                            jdbcDialect.sampleDataFromColumn(
-                                    getOrEstablishConnection(),
-                                    table,
-                                    splitColumnName,
-                                    inverseSamplingRate);
-                    log.info(
-                            "Sample data from table {} end, the sample size is 
{}",
-                            tablePath,
-                            sample.length);
-                    return efficientShardingThroughSampling(
-                            tablePath, sample, approximateRowCnt, shardCount);
+        long approximateRowCnt = queryApproximateRowCnt(table);
+        double distributionFactor =
+                calculateDistributionFactor(tablePath, min, max, 
approximateRowCnt);
+
+        boolean dataIsEvenlyDistributed =
+                ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorLower) >= 0
+                        && ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorUpper)
+                                <= 0;
+
+        if (dataIsEvenlyDistributed) {
+            // the minimum dynamic chunk size is at least 1
+            final int dynamicChunkSize = Math.max((int) (distributionFactor * 
chunkSize), 1);
+            return splitEvenlySizedChunks(
+                    tablePath, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
+        } else {
+            int shardCount = (int) (approximateRowCnt / chunkSize);
+            int inverseSamplingRate = config.getSplitInverseSamplingRate();
+            if (sampleShardingThreshold < shardCount) {
+                // It is necessary to ensure that the number of data rows 
sampled by the
+                // sampling rate is greater than the number of shards.
+                // Otherwise, if the sampling rate is too low, it may result 
in an insufficient
+                // number of data rows for the shards, leading to an 
inadequate number of
+                // shards.
+                // Therefore, inverseSamplingRate should be less than chunkSize
+                if (inverseSamplingRate > chunkSize) {
+                    log.warn(
+                            "The inverseSamplingRate is {}, which is greater 
than chunkSize {}, so we set inverseSamplingRate to chunkSize",
+                            inverseSamplingRate,
+                            chunkSize);
+                    inverseSamplingRate = chunkSize;
                 }
-                return splitUnevenlySizedChunks(table, splitColumnName, min, 
max, chunkSize);
+                log.info(
+                        "Use sampling sharding for table {}, the sampling rate 
is {}",
+                        tablePath,
+                        inverseSamplingRate);
+                Object[] sample =
+                        jdbcDialect.sampleDataFromColumn(
+                                getOrEstablishConnection(),
+                                table,
+                                splitColumnName,
+                                inverseSamplingRate);
+                log.info(
+                        "Sample data from table {} end, the sample size is {}",
+                        tablePath,
+                        sample.length);
+                return efficientShardingThroughSampling(
+                        tablePath, sample, approximateRowCnt, shardCount);
             }
-        } else {
             return splitUnevenlySizedChunks(table, splitColumnName, min, max, 
chunkSize);
         }
     }
@@ -309,6 +330,71 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         return splits;
     }
 
+    /**
+     * split by date type column
+     *
+     * @param table
+     * @param splitColumnName
+     * @param min
+     * @param max
+     * @param chunkSize
+     * @return
+     * @throws SQLException
+     */
+    private List<ChunkRange> dateColumnSplitChunks(
+            JdbcSourceTable table, String splitColumnName, Object min, Object 
max, int chunkSize)
+            throws SQLException {
+        log.info("Use date chunks for table {}", table.getTablePath());
+        final List<ChunkRange> splits = new ArrayList<>();
+        Date sqlDateMin = null;
+        Date sqlDateMax = null;
+        if (min instanceof Date) {
+            sqlDateMin = (Date) min;
+            sqlDateMax = (Date) max;
+        } else if (min instanceof Timestamp) {
+            sqlDateMin = new Date(((Timestamp) min).getTime());
+            sqlDateMax = new Date(((Timestamp) max).getTime());
+        }
+        List<LocalDate> dateRange =
+                getDateRange(sqlDateMin.toLocalDate(), 
sqlDateMax.toLocalDate());
+        if (dateRange.size() > 20 * 365) {
+            // TODO: If dateRange granter than 20 year, need get the real date 
in the table
+        }
+
+        Long rowCnt = queryApproximateRowCnt(table);
+        int step = 1;
+        if (rowCnt / dateRange.size() < chunkSize) {
+            int splitNum = (int) (rowCnt / chunkSize) + 1;
+            step = dateRange.size() / splitNum;
+        }
+
+        for (int i = 0; i < dateRange.size(); i = i + step) {
+            if (i == 0) {
+                splits.add(ChunkRange.of(null, dateRange.get(i)));
+            } else {
+                splits.add(ChunkRange.of(dateRange.get(i - step), 
dateRange.get(i)));
+            }
+
+            if ((i + step) >= dateRange.size()) {
+                splits.add(ChunkRange.of(dateRange.get(i), null));
+            }
+        }
+        return splits;
+    }
+
+    // obtaining date range
+    private static List<LocalDate> getDateRange(LocalDate startDate, LocalDate 
endDate) {
+        List<LocalDate> dateRange = new ArrayList<>();
+
+        LocalDate currentDate = startDate;
+        while (!currentDate.isAfter(endDate)) {
+            dateRange.add(currentDate);
+            currentDate = currentDate.plusDays(1);
+        }
+
+        return dateRange;
+    }
+
     private Object nextChunkEnd(
             Object previousChunkEnd,
             JdbcSourceTable table,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
index 555e79072e..d5f9260879 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
@@ -30,6 +30,10 @@ public class ObjectUtils {
             return Math.addExact((Integer) number, augend);
         } else if (number instanceof Long) {
             return Math.addExact((Long) number, augend);
+        } else if (number instanceof Float) {
+            return ((Float) number) + augend;
+        } else if (number instanceof Double) {
+            return ((Double) number) + augend;
         } else if (number instanceof BigInteger) {
             return ((BigInteger) number).add(BigInteger.valueOf(augend));
         } else if (number instanceof BigDecimal) {
@@ -56,6 +60,12 @@ public class ObjectUtils {
         } else if (minuend instanceof Long) {
             return BigDecimal.valueOf((long) minuend)
                     .subtract(BigDecimal.valueOf((long) subtrahend));
+        } else if (minuend instanceof Float) {
+            return BigDecimal.valueOf((float) minuend)
+                    .subtract(BigDecimal.valueOf((float) subtrahend));
+        } else if (minuend instanceof Double) {
+            return BigDecimal.valueOf((double) minuend)
+                    .subtract(BigDecimal.valueOf((double) subtrahend));
         } else if (minuend instanceof BigInteger) {
             return new BigDecimal(
                     ((BigInteger) minuend).subtract((BigInteger) 
subtrahend).toString());
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
new file mode 100644
index 0000000000..eab16faa42
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.DynamicChunkSplitter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+public class JdbcMysqlSplitIT extends TestSuiteBase implements TestResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcMysqlSplitIT.class);
+
+    private static final String MYSQL_IMAGE = "mysql:latest";
+    private static final String MYSQL_CONTAINER_HOST = "mysql-e2e";
+    private static final String MYSQL_DATABASE = "auto";
+    private static final String MYSQL_TABLE = "split_test";
+
+    private static final String MYSQL_USERNAME = "root";
+    private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel";
+    private static final int MYSQL_PORT = 3312;
+
+    private MySQLContainer<?> mysql_container;
+
+    LocalDate currentDateOld = LocalDate.of(2024, 1, 18);
+
+    private static final String CREATE_SQL =
+            "CREATE TABLE IF NOT EXISTS "
+                    + MYSQL_TABLE
+                    + "\n"
+                    + "(\n"
+                    + "    `id`                     int                   NOT 
NULL,\n"
+                    + "    `c_bit_1`                bit(1)                
DEFAULT NULL,\n"
+                    + "    `c_bit_8`                bit(8)                
DEFAULT NULL,\n"
+                    + "    `c_bit_16`               bit(16)               
DEFAULT NULL,\n"
+                    + "    `c_bit_32`               bit(32)               
DEFAULT NULL,\n"
+                    + "    `c_bit_64`               bit(64)               
DEFAULT NULL,\n"
+                    + "    `c_boolean`              tinyint(1)            
DEFAULT NULL,\n"
+                    + "    `c_tinyint`              tinyint(4)            
DEFAULT NULL,\n"
+                    + "    `c_tinyint_unsigned`     tinyint(3) unsigned   
DEFAULT NULL,\n"
+                    + "    `c_smallint`             smallint(6)           
DEFAULT NULL,\n"
+                    + "    `c_smallint_unsigned`    smallint(5) unsigned  
DEFAULT NULL,\n"
+                    + "    `c_mediumint`            mediumint(9)          
DEFAULT NULL,\n"
+                    + "    `c_mediumint_unsigned`   mediumint(8) unsigned 
DEFAULT NULL,\n"
+                    + "    `c_int`                  int(11)               
DEFAULT NULL,\n"
+                    + "    `c_integer`              int(11)               
DEFAULT NULL,\n"
+                    + "    `c_bigint`               bigint(20)            
DEFAULT NULL,\n"
+                    + "    `c_bigint_unsigned`      bigint(20) unsigned   
DEFAULT NULL,\n"
+                    + "    `c_decimal`              decimal(20, 0)        
DEFAULT NULL,\n"
+                    + "    `c_decimal_unsigned`     decimal(38, 10)       
DEFAULT NULL,\n"
+                    + "    `c_float`                float                 
DEFAULT NULL,\n"
+                    + "    `c_float_unsigned`       float unsigned        
DEFAULT NULL,\n"
+                    + "    `c_double`               double                
DEFAULT NULL,\n"
+                    + "    `c_double_unsigned`      double unsigned       
DEFAULT NULL,\n"
+                    + "    `c_char`                 char(1)               
DEFAULT NULL,\n"
+                    + "    `c_tinytext`             tinytext,\n"
+                    + "    `c_mediumtext`           mediumtext,\n"
+                    + "    `c_text`                 text,\n"
+                    + "    `c_varchar`              varchar(255)          
DEFAULT NULL,\n"
+                    + "    `c_json`                 json                  
DEFAULT NULL,\n"
+                    + "    `c_longtext`             longtext,\n"
+                    + "    `c_date`                 date                  
DEFAULT NULL,\n"
+                    + "    `c_datetime`             datetime              
DEFAULT NULL,\n"
+                    + "    `c_timestamp`            timestamp NULL        
DEFAULT NULL,\n"
+                    + "    `c_tinyblob`             tinyblob,\n"
+                    + "    `c_mediumblob`           mediumblob,\n"
+                    + "    `c_blob`                 blob,\n"
+                    + "    `c_longblob`             longblob,\n"
+                    + "    `c_varbinary`            varbinary(255)        
DEFAULT NULL,\n"
+                    + "    `c_binary`               binary(1)             
DEFAULT NULL,\n"
+                    + "    `c_year`                 year(4)               
DEFAULT NULL,\n"
+                    + "    `c_int_unsigned`         int(10) unsigned      
DEFAULT NULL,\n"
+                    + "    `c_integer_unsigned`     int(10) unsigned      
DEFAULT NULL,\n"
+                    + "    `c_bigint_30`            BIGINT(40)  unsigned  
DEFAULT NULL,\n"
+                    + "    `c_decimal_unsigned_30`  DECIMAL(30) unsigned  
DEFAULT NULL,\n"
+                    + "    `c_decimal_30`           DECIMAL(30)           
DEFAULT NULL,\n"
+                    + "    PRIMARY KEY (`id`)\n"
+                    + ");";
+
+    void initContainer() throws ClassNotFoundException {
+        // ============= mysql
+        DockerImageName imageName = DockerImageName.parse(MYSQL_IMAGE);
+        mysql_container =
+                new MySQLContainer<>(imageName)
+                        .withUsername(MYSQL_USERNAME)
+                        .withPassword(MYSQL_PASSWORD)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_CONTAINER_HOST)
+                        .withExposedPorts(MYSQL_PORT)
+                        .waitingFor(Wait.forHealthcheck())
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MYSQL_IMAGE)));
+        mysql_container.setPortBindings(
+                Lists.newArrayList(String.format("%s:%s", MYSQL_PORT, 3306)));
+
+        Startables.deepStart(Stream.of(mysql_container)).join();
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        initContainer();
+        given().await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeJdbcTable);
+    }
+
+    private void initializeJdbcTable() {
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+        String insertSql = insertTable(MYSQL_DATABASE, MYSQL_TABLE, 
fieldNames);
+        insertTestData(insertSql, testDataSet.getRight());
+    }
+
+    public String insertTable(String schema, String table, String... fields) {
+        String columns =
+                
Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(", 
"));
+        String placeholders = Arrays.stream(fields).map(f -> 
"?").collect(Collectors.joining(", "));
+
+        return "INSERT INTO "
+                + schema
+                + "."
+                + table
+                + " ("
+                + columns
+                + " )"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    protected void insertTestData(String insertSql, List<SeaTunnelRow> rows) {
+        try (Connection connection = getJdbcConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(insertSql)) {
+
+            preparedStatement.execute(CREATE_SQL);
+            for (SeaTunnelRow row : rows) {
+                for (int index = 0; index < row.getArity(); index++) {
+                    preparedStatement.setObject(index + 1, 
row.getField(index));
+                }
+                preparedStatement.addBatch();
+            }
+
+            preparedStatement.executeBatch();
+
+            // ANALYZE TABLE
+            preparedStatement.execute("ANALYZE TABLE " + MYSQL_DATABASE + "." 
+ MYSQL_TABLE);
+
+        } catch (Exception exception) {
+            LOG.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+        }
+    }
+
+    public String quoteIdentifier(String field) {
+        return "`" + field + "`";
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                mysql_container.getJdbcUrl(),
+                mysql_container.getUsername(),
+                mysql_container.getPassword());
+    }
+
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames =
+                new String[] {
+                    "id",
+                    "c_bit_1",
+                    "c_bit_8",
+                    "c_bit_16",
+                    "c_bit_32",
+                    "c_bit_64",
+                    "c_boolean",
+                    "c_tinyint",
+                    "c_tinyint_unsigned",
+                    "c_smallint",
+                    "c_smallint_unsigned",
+                    "c_mediumint",
+                    "c_mediumint_unsigned",
+                    "c_int",
+                    "c_integer",
+                    "c_year",
+                    "c_int_unsigned",
+                    "c_integer_unsigned",
+                    "c_bigint",
+                    "c_bigint_unsigned",
+                    "c_decimal",
+                    "c_decimal_unsigned",
+                    "c_float",
+                    "c_float_unsigned",
+                    "c_double",
+                    "c_double_unsigned",
+                    "c_char",
+                    "c_tinytext",
+                    "c_mediumtext",
+                    "c_text",
+                    "c_varchar",
+                    "c_json",
+                    "c_longtext",
+                    "c_date",
+                    "c_datetime",
+                    "c_timestamp",
+                    "c_tinyblob",
+                    "c_mediumblob",
+                    "c_blob",
+                    "c_longblob",
+                    "c_varbinary",
+                    "c_binary",
+                    "c_bigint_30",
+                    "c_decimal_unsigned_30",
+                    "c_decimal_30",
+                };
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        BigDecimal bigintValue = new BigDecimal("2844674407371055000");
+        BigDecimal decimalValue = new 
BigDecimal("999999999999999999999999999899");
+        LocalDate currentDate = LocalDate.of(2024, 1, 17);
+
+        for (int i = 0; i < 100; i++) {
+            currentDate = currentDate.plusDays(1);
+            byte byteArr = Integer.valueOf(i).byteValue();
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                i,
+                                i % 2 == 0 ? (byte) 1 : (byte) 0,
+                                new byte[] {byteArr},
+                                new byte[] {byteArr, byteArr},
+                                new byte[] {byteArr, byteArr, byteArr, 
byteArr},
+                                new byte[] {
+                                    byteArr, byteArr, byteArr, byteArr, 
byteArr, byteArr, byteArr,
+                                    byteArr
+                                },
+                                i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                Long.parseLong(i + ""),
+                                Long.parseLong(i + ""),
+                                Long.parseLong(i + ""),
+                                BigDecimal.valueOf(i, 0),
+                                BigDecimal.valueOf(i, 0),
+                                BigDecimal.valueOf(i * 10000000000L, 10),
+                                Float.parseFloat(i + ".1"),
+                                Float.parseFloat(i + ".1"),
+                                Double.parseDouble(i + ".1"),
+                                Double.parseDouble(i + ".1"),
+                                "f",
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("{\"aa\":\"bb_%s\"}", i),
+                                String.format("f1_%s", i),
+                                Date.valueOf(currentDate),
+                                Timestamp.valueOf(LocalDateTime.now()),
+                                new Timestamp(System.currentTimeMillis()),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "f".getBytes(),
+                                bigintValue.add(BigDecimal.valueOf(i)),
+                                decimalValue.add(BigDecimal.valueOf(i)),
+                                decimalValue.add(BigDecimal.valueOf(i)),
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    static JdbcUrlUtil.UrlInfo mysqlUrlInfo =
+            JdbcUrlUtil.getUrlInfo(
+                    
String.format("jdbc:mysql://localhost:%s/auto?useSSL=false", MYSQL_PORT));
+
+    @Test
+    public void testSplit() throws Exception {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("url", mysqlUrlInfo.getUrlWithDatabase().get());
+        configMap.put("driver", "com.mysql.cj.jdbc.Driver");
+        configMap.put("user", MYSQL_USERNAME);
+        configMap.put("password", MYSQL_PASSWORD);
+        configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE);
+        configMap.put("split.size", "10");
+        DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);
+
+        TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE);
+        MySqlCatalog mySqlCatalog =
+                new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, 
mysqlUrlInfo);
+        mySqlCatalog.open();
+        Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql));
+        CatalogTable table = mySqlCatalog.getTable(tablePathMySql);
+
+        JdbcSourceTable jdbcSourceTable =
+                JdbcSourceTable.builder()
+                        .tablePath(TablePath.of(MYSQL_DATABASE, MYSQL_TABLE))
+                        .catalogTable(table)
+                        .build();
+        Collection<JdbcSourceSplit> jdbcSourceSplits = 
splitter.generateSplits(jdbcSourceTable);
+        Assertions.assertEquals(10, jdbcSourceSplits.size());
+        JdbcSourceSplit[] splitArray = jdbcSourceSplits.toArray(new 
JdbcSourceSplit[0]);
+        Assertions.assertEquals("id", splitArray[0].getSplitKeyName());
+        assertNumSplit(splitArray, "");
+
+        // use tinyint column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_tinyint", 10);
+        assertNumSplit(splitArray, "");
+
+        // use tinyint_unsigned column to split
+        splitArray = getCheckedSplitArray(configMap, table, 
"c_tinyint_unsigned", 10);
+        configMap.put("partition_column", "c_tinyint_unsigned");
+        assertNumSplit(splitArray, "");
+
+        // use smallint column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_smallint", 10);
+        configMap.put("partition_column", "c_smallint");
+        assertNumSplit(splitArray, "");
+
+        // use smallint_unsigned column to split
+        splitArray = getCheckedSplitArray(configMap, table, 
"c_smallint_unsigned", 10);
+        configMap.put("partition_column", "c_smallint_unsigned");
+        assertNumSplit(splitArray, "");
+
+        // use int column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_int", 10);
+        configMap.put("partition_column", "c_int");
+        assertNumSplit(splitArray, "");
+
+        // use int column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_integer", 10);
+        configMap.put("partition_column", "c_integer");
+        assertNumSplit(splitArray, "");
+
+        // use int_unsigned column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_int_unsigned", 
10);
+        configMap.put("partition_column", "c_int_unsigned");
+        assertNumSplit(splitArray, "");
+
+        // use integer_unsigned column to split
+        splitArray = getCheckedSplitArray(configMap, table, 
"c_integer_unsigned", 10);
+        configMap.put("partition_column", "c_integer_unsigned");
+        assertNumSplit(splitArray, "");
+
+        // use int column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_mediumint", 10);
+        configMap.put("partition_column", "c_mediumint");
+        assertNumSplit(splitArray, "");
+
+        // use int column to split
+        splitArray = getCheckedSplitArray(configMap, table, 
"c_mediumint_unsigned", 10);
+        configMap.put("partition_column", "c_mediumint_unsigned");
+        assertNumSplit(splitArray, "");
+
+        // use bigint column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_bigint", 10);
+        configMap.put("partition_column", "c_bigint");
+        assertNumSplit(splitArray, "");
+
+        // use bigint_unsigned column to split
+        splitArray = getCheckedSplitArray(configMap, table, 
"c_bigint_unsigned", 10);
+        configMap.put("partition_column", "c_bigint_unsigned");
+        assertNumSplit(splitArray, "");
+
+        // use decimal column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_decimal", 10);
+        configMap.put("partition_column", "c_decimal");
+        assertNumSplit(splitArray, "");
+
+        // use decimal_unsigned column to split
+        splitArray = getCheckedSplitArray(configMap, table, 
"c_decimal_unsigned", 10);
+        configMap.put("partition_column", "c_decimal_unsigned");
+        assertNumSplit(splitArray, ".0000000000");
+
+        // use double column to split
+        splitArray = getCheckedSplitArray(configMap, table, "c_double", 10);
+        configMap.put("partition_column", "c_double");
+        assertNumSplit(splitArray, ".1");
+
+        // use unsigned double column to split
+        splitArray = getCheckedSplitArray(configMap, table, 
"c_double_unsigned", 10);
+        configMap.put("partition_column", "c_double_unsigned");
+        assertNumSplit(splitArray, ".1");
+
+        // use date column to split
+        configMap.put("partition_column", "c_date");
+        splitArray = getCheckedSplitArray(configMap, table, "c_date", 13);
+        configMap.put("partition_column", "c_date");
+        assertDateSplit(splitArray);
+
+        mySqlCatalog.close();
+    }
+
+    private JdbcSourceSplit[] getCheckedSplitArray(
+            Map<String, Object> configMap, CatalogTable table, String 
splitKey, int splitNum)
+            throws SQLException {
+        configMap.put("partition_column", splitKey);
+        DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);
+
+        JdbcSourceTable jdbcSourceTable =
+                JdbcSourceTable.builder()
+                        .tablePath(TablePath.of(MYSQL_DATABASE, MYSQL_TABLE))
+                        .catalogTable(table)
+                        .partitionColumn(splitKey)
+                        .build();
+        Collection<JdbcSourceSplit> jdbcSourceSplits = 
splitter.generateSplits(jdbcSourceTable);
+        Assertions.assertEquals(splitNum, jdbcSourceSplits.size());
+        JdbcSourceSplit[] splitArray = jdbcSourceSplits.toArray(new 
JdbcSourceSplit[0]);
+        Assertions.assertEquals(splitKey, splitArray[0].getSplitKeyName());
+        return splitArray;
+    }
+
+    private void assertNumSplit(JdbcSourceSplit[] splitArray, String info) {
+        for (int i = 0; i < splitArray.length; i++) {
+            if (i == 0) {
+                Assertions.assertEquals(null, splitArray[i].getSplitStart());
+                Assertions.assertEquals("10" + info, 
splitArray[i].getSplitEnd().toString());
+                continue;
+            }
+
+            if (i == splitArray.length - 1 && i != 0) {
+                Assertions.assertEquals(10 * i + info, 
splitArray[i].getSplitStart().toString());
+                Assertions.assertEquals(null, splitArray[i].getSplitEnd());
+                continue;
+            }
+
+            Assertions.assertEquals(10 * i + info, 
splitArray[i].getSplitStart().toString());
+            Assertions.assertEquals(10 * (i + 1) + info, 
splitArray[i].getSplitEnd().toString());
+        }
+    }
+
+    private void assertDateSplit(JdbcSourceSplit[] splitArray) {
+        for (int i = 0; i < splitArray.length; i++) {
+            if (i == 0) {
+                Assertions.assertEquals(null, splitArray[i].getSplitStart());
+                Assertions.assertEquals(
+                        currentDateOld.plusDays(i * 9).toString(),
+                        splitArray[i].getSplitEnd().toString());
+                continue;
+            }
+
+            if (i == splitArray.length - 1 && i != 0) {
+                Assertions.assertEquals(
+                        currentDateOld.plusDays((i - 1) * 9).toString(),
+                        splitArray[i].getSplitStart().toString());
+                Assertions.assertEquals(null, splitArray[i].getSplitEnd());
+                continue;
+            }
+
+            Assertions.assertEquals(
+                    currentDateOld.plusDays((i - 1) * 9).toString(),
+                    splitArray[i].getSplitStart().toString());
+            Assertions.assertEquals(
+                    currentDateOld.plusDays(i * 9).toString(),
+                    splitArray[i].getSplitEnd().toString());
+        }
+    }
+
+    @NotNull private DynamicChunkSplitter getDynamicChunkSplitter(Map<String, 
Object> configMap) {
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+        JdbcSourceConfig sourceConfig = JdbcSourceConfig.of(readonlyConfig);
+        DynamicChunkSplitter splitter = new DynamicChunkSplitter(sourceConfig);
+        return splitter;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (mysql_container != null) {
+            mysql_container.close();
+            
dockerClient.removeContainerCmd(mysql_container.getContainerId()).exec();
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 20de2932a1..3b81f2a655 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -141,7 +141,6 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                 .untilAsserted(
                         () -> {
                             JobMetrics jobMetrics = 
coordinatorService.getJobMetrics(jobId3);
-                            log.info(jobMetrics.toJsonString());
                             assertTrue(40 < (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
                             assertTrue(40 < (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
                             assertTrue(

Reply via email to