This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b9a62e5c3f Add date type and float type column split support (#6160)
b9a62e5c3f is described below
commit b9a62e5c3f9e47f2718dcd64a459cc7cfe701eb5
Author: Eric <[email protected]>
AuthorDate: Thu Jan 18 11:04:17 2024 +0800
Add date type and float type column split support (#6160)
---
docs/en/connector-v2/source/Jdbc.md | 151 ++++--
docs/en/connector-v2/source/Mysql.md | 143 ++++--
docs/en/connector-v2/source/Oracle.md | 160 +++++-
docs/en/connector-v2/source/PostgreSQL.md | 159 +++++-
docs/en/connector-v2/source/SqlServer.md | 99 +++-
.../jdbc/internal/dialect/JdbcDialect.java | 6 +
.../seatunnel/jdbc/internal/dialect/SQLUtils.java | 5 +
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 6 +
.../internal/dialect/oracle/OracleDialect.java | 5 +
.../internal/dialect/psql/PostgresDialect.java | 4 +
.../dialect/sqlserver/SqlServerDialect.java | 5 +
.../seatunnel/jdbc/source/ChunkSplitter.java | 19 +-
.../jdbc/source/DynamicChunkSplitter.java | 190 +++++--
.../seatunnel/jdbc/utils/ObjectUtils.java | 10 +
.../seatunnel/jdbc/JdbcMysqlSplitIT.java | 548 +++++++++++++++++++++
.../engine/server/master/JobMetricsTest.java | 1 -
16 files changed, 1329 insertions(+), 182 deletions(-)
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index e4ae04d043..38ed1f6512 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -82,22 +82,6 @@ The compatible mode of database, required when the database
supports multiple co
The time in seconds to wait for the database operation used to validate the
connection to complete.
-### partition_column [string]
-
-The column name for parallelism's partition, only support numeric type.
-
-### partition_upper_bound [BigDecimal]
-
-The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
-
-### partition_lower_bound [BigDecimal]
-
-The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
-
-### partition_num [int]
-
-The number of partition count, only support positive integer. default value is
job parallelism
-
### fetch_size [int]
For queries that return a large number of objects, you can configure the row
fetch size used in the query to
@@ -139,34 +123,73 @@ table_list = [
Common row filter conditions for all tables/queries, must start with `where`.
for example `where id > 100`
-### split.size
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
+
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables.
SeaTunnel will use certain rules to split the data in the table, which will be
handed over to readers for reading. The number of readers is determined by the
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table
and get the Primary Key and Unique Index. If there are more than one column in
Primary Key and Unique Index, The first column which in the **supported split
data type** will be used to split data. For example, the table have Primary
Key(nn guid, name varchar), because `guid` id not in **supported split data
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits
when read of table.
-The split size (number of rows) of table, captured tables are split into
multiple splits when read of table.
+#### split.even-distribution.factor.lower-bound
-### split.even-distribution.factor.lower-bound
+> Not recommended for use
The lower bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be greater than or equal to this lower bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is less, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
-### split.even-distribution.factor.upper-bound
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
The upper bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be less than or equal to this upper bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is greater, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
-### split.sample-sharding.threshold
+#### split.sample-sharding.threshold
This configuration specifies the threshold of estimated shard count to trigger
the sample sharding strategy. When the distribution factor is outside the
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding strategy will be used. This can help to handle large datasets
more efficiently. The default [...]
-### split.inverse-sampling.rate
+#### split.inverse-sampling.rate
The inverse of the sampling rate used in the sample sharding strategy. For
example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is preferred. The default value is 1000.
-### common options
+#### partition_column [string]
-Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of
split through `split.size`
+
+How many splits do we need to split into, only support positive integer.
default value is job parallelism.
## tips
-If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed
-in parallel according to the concurrency of tasks.
+> If the table can not be split(for example, table have no Primary Key or
Unique Index, and `partition_column` is not set), it will run in single
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to
read multiple tables, use `table_list`.
## appendix
@@ -197,7 +220,7 @@ there are some reference value for params above.
## Example
-simple:
+### simple
```
Jdbc {
@@ -210,7 +233,7 @@ Jdbc {
}
```
-parallel:
+### parallel by partition_column
```
env {
@@ -226,7 +249,7 @@ source {
password = "123456"
query = "select * from type_bin"
partition_column = "id"
- partition_num = 10
+ split.size = 10000
# Read start boundary
#partition_lower_bound = ...
# Read end boundary
@@ -239,29 +262,61 @@ sink {
}
```
-Using `table_path` read:
+### Parallel Boundary:
-***Configuring `table_path` will turn on auto split, you can configure
`split.*` to adjust the split strategy***
+> It is more efficient to specify the data within the upper and lower bounds
of the query. It is more efficient to read your data source according to the
upper and lower boundaries you configured.
-```hocon
-Jdbc {
- url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
- driver = "com.mysql.cj.jdbc.Driver"
- connection_check_timeout_sec = 100
- user = "root"
- password = "123456"
-
- # e.g. table_path = "testdb.table1"、table_path =
"test_schema.table1"、table_path = "testdb.test_schema.table1"
- table_path = "testdb.table1"
- #split.size = 8096
- #split.even-distribution.factor.upper-bound = 100
- #split.even-distribution.factor.lower-bound = 0.05
- #split.sample-sharding.threshold = 1000
- #split.inverse-sampling.rate = 1000
+```
+source {
+ Jdbc {
+ url =
"jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+ driver = "com.mysql.cj.jdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "123456"
+ # Define query logic as required
+ query = "select * from type_bin"
+ partition_column = "id"
+ # Read start boundary
+ partition_lower_bound = 1
+ # Read end boundary
+ partition_upper_bound = 500
+ partition_num = 10
+ properties {
+ useSSL=false
+ }
+ }
+}
+```
+
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure
`split.*` to adjust the split strategy
+
+```
+env {
+ parallelism = 10
+ job.mode = "BATCH"
+}
+source {
+ Jdbc {
+ url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
+ driver = "com.mysql.cj.jdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "123456"
+ table_path = "testdb.table1"
+ query = "select * from testdb.table1"
+ split.size = 10000
+ }
+}
+
+sink {
+ Console {}
}
```
-multiple table read:
+### multiple table read:
***Configuring `table_list` will turn on auto split, you can configure
`split.*` to adjust the split strategy***
@@ -285,7 +340,7 @@ Jdbc {
}
]
#where_condition= "where id > 100"
- #split.size = 8096
+ #split.size = 10000
#split.even-distribution.factor.upper-bound = 100
#split.even-distribution.factor.lower-bound = 0.05
#split.sample-sharding.threshold = 1000
diff --git a/docs/en/connector-v2/source/Mysql.md
b/docs/en/connector-v2/source/Mysql.md
index b1d7938725..e1fe6a3e8e 100644
--- a/docs/en/connector-v2/source/Mysql.md
+++ b/docs/en/connector-v2/source/Mysql.md
@@ -85,9 +85,67 @@ Read external data source data through JDBC.
| split.inverse-sampling.rate | Int | No | 1000
| The inverse of the sampling rate used in the sample sharding
strategy. For example, if this value is set to 1000, it means a 1/1000 sampling
rate is applied during the sampling process. This option provides flexibility
in controlling the granularity of the sampling, thus affecting the final number
of shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is p [...]
| common-options | | No | -
| Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
[...]
-### Tips
+## Parallel Reader
-> If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed in parallel according to the
concurrency of tasks , When your shard read field is a large number type such
as bigint(30) and above and the data is not evenly distributed, it is
recommended to set the parallelism level to 1 to ensure that the data skew
problem is resolved
+The JDBC Source connector supports parallel reading of data from tables.
SeaTunnel will use certain rules to split the data in the table, which will be
handed over to readers for reading. The number of readers is determined by the
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table
and get the Primary Key and Unique Index. If there are more than one column in
Primary Key and Unique Index, The first column which in the **supported split
data type** will be used to split data. For example, the table have Primary
Key(nn guid, name varchar), because `guid` id not in **supported split data
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be greater than or equal to this lower bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is less, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be less than or equal to this upper bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is greater, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger
the sample sharding strategy. When the distribution factor is outside the
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding strategy will be used. This can help to handle large datasets
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For
example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of
split through `split.size`
+
+How many splits do we need to split into, only support positive integer.
default value is job parallelism.
+
+## tips
+
+> If the table can not be split(for example, table have no Primary Key or
Unique Index, and `partition_column` is not set), it will run in single
concurrency.
>
> Use `table_path` to replace `query` for single table reading. If you need to
> read multiple tables, use `table_list`.
@@ -100,7 +158,7 @@ Read external data source data through JDBC.
```
# Defining the runtime environment
env {
- parallelism = 2
+ parallelism = 4
job.mode = "BATCH"
}
source{
@@ -124,33 +182,57 @@ sink {
}
```
-### Parallel:
-
-> Read your query table in parallel with the shard field you configured and
the shard data You can do this if you want to read the whole table
+### parallel by partition_column
```
env {
- parallelism = 10
+ parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
- url =
"jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+ url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
- # Define query logic as required
query = "select * from type_bin"
- # Parallel sharding reads fields
partition_column = "id"
- # Number of fragments
- partition_num = 10
- properties {
- useSSL=false
- }
+ split.size = 10000
+ # Read start boundary
+ #partition_lower_bound = ...
+ # Read end boundary
+ #partition_upper_bound = ...
}
}
+
+sink {
+ Console {}
+}
+```
+
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure
`split.*` to adjust the split strategy
+
+```
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+source {
+ Jdbc {
+ url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
+ driver = "com.mysql.cj.jdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "123456"
+ table_path = "testdb.table1"
+ query = "select * from testdb.table1"
+ split.size = 10000
+ }
+}
+
sink {
Console {}
}
@@ -183,36 +265,6 @@ source {
}
```
-### Using `table_path` read:
-
-***Configuring `table_path` will turn on auto split, you can configure
`split.*` to adjust the split strategy***
-
-```hocon
-env {
- job.mode = "BATCH"
-}
-source {
- Jdbc {
- url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
- driver = "com.mysql.cj.jdbc.Driver"
- connection_check_timeout_sec = 100
- user = "root"
- password = "123456"
-
- table_path = "testdb.table1"
- #split.size = 8096
- #split.even-distribution.factor.upper-bound = 100
- #split.even-distribution.factor.lower-bound = 0.05
- #split.sample-sharding.threshold = 1000
- #split.inverse-sampling.rate = 1000
- }
-}
-
-sink {
- Console {}
-}
-```
-
### Multiple table read:
***Configuring `table_list` will turn on auto split, you can configure
`split.*` to adjust the split strategy***
@@ -220,6 +272,7 @@ sink {
```hocon
env {
job.mode = "BATCH"
+ parallelism = 4
}
source {
Jdbc {
diff --git a/docs/en/connector-v2/source/Oracle.md
b/docs/en/connector-v2/source/Oracle.md
index ec1eddc5cd..46d1761967 100644
--- a/docs/en/connector-v2/source/Oracle.md
+++ b/docs/en/connector-v2/source/Oracle.md
@@ -68,11 +68,94 @@ Read external data source data through JDBC.
| partition_num | Int | No | job parallelism | The
number of partition count, only support positive integer. default value is job
parallelism
|
| fetch_size | Int | No | 0 | For
queries that return a large number of objects,you can configure<br/> the row
fetch size used in the query toimprove performance by<br/> reducing the number
database hits required to satisfy the selection criteria.<br/> Zero means use
jdbc default value. |
| properties | Map | No | - |
Additional connection configuration parameters,when properties and URL have the
same parameters, the priority is determined by the <br/>specific implementation
of the driver. For example, in MySQL, properties take precedence over the URL.
|
-| common-options | | No | - |
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
|
-### Tips
+| Name | Type | Required |
Default |
Description
[...]
+|--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
jdbc:mysql://localhost:3306:3306/test
[...]
+| driver | String | Yes | -
| The jdbc class name used to connect to the remote data source,<br/>
if you use MySQL the value is `com.mysql.cj.jdbc.Driver`.
[...]
+| user | String | No | -
| Connection instance user name
[...]
+| password | String | No | -
| Connection instance password
[...]
+| query | String | Yes | -
| Query statement
[...]
+| connection_check_timeout_sec | Int | No | 30
| The time in seconds to wait for the database operation used to
validate the connection to complete
[...]
+| partition_column | String | No | -
| The column name for parallelism's partition, only support numeric
type,Only support numeric type primary key, and only can config one column.
[...]
+| partition_lower_bound | BigDecimal | No | -
| The partition_column min value for scan, if not set SeaTunnel will
query database get min value.
[...]
+| partition_upper_bound | BigDecimal | No | -
| The partition_column max value for scan, if not set SeaTunnel will
query database get max value.
[...]
+| partition_num | Int | No | job
parallelism | The number of partition count, only support positive integer.
default value is job parallelism
[...]
+| fetch_size | Int | No | 0
| For queries that return a large number of objects,you can
configure<br/> the row fetch size used in the query toimprove performance
by<br/> reducing the number database hits required to satisfy the selection
criteria.<br/> Zero means use jdbc default value.
[...]
+| properties | Map | No | -
| Additional connection configuration parameters,when properties and
URL have the same parameters, the priority is determined by the <br/>specific
implementation of the driver. For example, in MySQL, properties take precedence
over the URL.
[...]
+| table_path | Int | No | 0
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle:
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1"
<br/>postgresql: "testdb.test_schema.table1"
[...]
+| table_list | Array | No | 0
| The list of tables to be read, you can use this configuration instead
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path =
"testdb.table2", query = "select * id, name from testdb.table2"}]```
[...]
+| where_condition | String | No | -
| Common row filter conditions for all tables/queries, must start with
`where`. for example `where id > 100`
[...]
+| split.size | Int | No | 8096
| The split size (number of rows) of table, captured tables are split
into multiple splits when read of table.
[...]
+| split.even-distribution.factor.lower-bound | Double | No | 0.05
| The lower bound of the chunk key distribution factor. This factor is
used to determine whether the table data is evenly distributed. If the
distribution factor is calculated to be greater than or equal to this lower
bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be
optimized for even distribution. Otherwise, if the distribution factor is less,
the table will be considered a [...]
+| split.even-distribution.factor.upper-bound | Double | No | 100
| The upper bound of the chunk key distribution factor. This factor is
used to determine whether the table data is evenly distributed. If the
distribution factor is calculated to be less than or equal to this upper bound
(i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be
optimized for even distribution. Otherwise, if the distribution factor is
greater, the table will be considered a [...]
+| split.sample-sharding.threshold | Int | No | 10000
| This configuration specifies the threshold of estimated shard count
to trigger the sample sharding strategy. When the distribution factor is
outside the bounds specified by
`chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding st [...]
+| split.inverse-sampling.rate | Int | No | 1000
| The inverse of the sampling rate used in the sample sharding
strategy. For example, if this value is set to 1000, it means a 1/1000 sampling
rate is applied during the sampling process. This option provides flexibility
in controlling the granularity of the sampling, thus affecting the final number
of shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is p [...]
+| common-options | | No | -
| Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
[...]
-> If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed in parallel according to the
concurrency of tasks.
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables.
SeaTunnel will use certain rules to split the data in the table, which will be
handed over to readers for reading. The number of readers is determined by the
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table
and get the Primary Key and Unique Index. If there are more than one column in
Primary Key and Unique Index, The first column which in the **supported split
data type** will be used to split data. For example, the table have Primary
Key(nn guid, name varchar), because `guid` id not in **supported split data
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be greater than or equal to this lower bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is less, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be less than or equal to this upper bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is greater, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger
the sample sharding strategy. When the distribution factor is outside the
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding strategy will be used. This can help to handle large datasets
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For
example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of
split through `split.size`
+
+How many splits do we need to split into, only support positive integer.
default value is job parallelism.
+
+## tips
+
+> If the table can not be split(for example, table have no Primary Key or
Unique Index, and `partition_column` is not set), it will run in single
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to
read multiple tables, use `table_list`.
## Task Example
@@ -83,7 +166,7 @@ Read external data source data through JDBC.
```
# Defining the runtime environment
env {
- parallelism = 2
+ parallelism = 4
job.mode = "BATCH"
}
source{
@@ -106,13 +189,13 @@ sink {
}
```
-### Parallel:
+### parallel by partition_column
> Read your query table in parallel with the shard field you configured and
> the shard data You can do this if you want to read the whole table
```
env {
- parallelism = 10
+ parallelism = 4
job.mode = "BATCH"
}
source {
@@ -138,6 +221,33 @@ sink {
}
```
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure
`split.*` to adjust the split strategy
+
+```
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+source {
+ Jdbc {
+ url = "jdbc:oracle:thin:@datasource01:1523:xe"
+ driver = "oracle.jdbc.OracleDriver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "123456"
+ table_path = "DA.SCHEMA1.TABLE1"
+ query = "select * from SCHEMA1.TABLE1"
+ split.size = 10000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
### Parallel Boundary:
> It is more efficient to specify the data within the upper and lower bounds
> of the query It is more efficient to read your data source according to the
> upper and lower boundaries you configured
@@ -162,3 +272,41 @@ source {
}
```
+### Multiple table read:
+
+***Configuring `table_list` will turn on auto split, you can configure
`split.*` to adjust the split strategy***
+
+```hocon
+env {
+ job.mode = "BATCH"
+ parallelism = 4
+}
+source {
+ Jdbc {
+ url = "jdbc:oracle:thin:@datasource01:1523:xe"
+ driver = "oracle.jdbc.OracleDriver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "123456"
+ "table_list"=[
+ {
+ "table_path"="XE.TEST.USER_INFO"
+ },
+ {
+ "table_path"="XE.TEST.YOURTABLENAME"
+ }
+ ]
+ #where_condition= "where id > 100"
+ split.size = 10000
+ #split.even-distribution.factor.upper-bound = 100
+ #split.even-distribution.factor.lower-bound = 0.05
+ #split.sample-sharding.threshold = 1000
+ #split.inverse-sampling.rate = 1000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
diff --git a/docs/en/connector-v2/source/PostgreSQL.md
b/docs/en/connector-v2/source/PostgreSQL.md
index a29b9b102c..e991f22c1f 100644
--- a/docs/en/connector-v2/source/PostgreSQL.md
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -77,11 +77,94 @@ Read external data source data through JDBC.
| partition_num | Int | No | job parallelism | The
number of partition count, only support positive integer. default value is job
parallelism
|
| fetch_size | Int | No | 0 | For
queries that return a large number of objects,you can configure<br/> the row
fetch size used in the query toimprove performance by<br/> reducing the number
database hits required to satisfy the selection criteria.<br/> Zero means use
jdbc default value. |
| properties | Map | No | - |
Additional connection configuration parameters,when properties and URL have the
same parameters, the priority is determined by the <br/>specific implementation
of the driver. For example, in MySQL, properties take precedence over the URL.
|
-| common-options | | No | - |
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
|
-### Tips
+| Name | Type | Required |
Default |
Description
[...]
+|--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
jdbc:mysql://localhost:3306:3306/test
[...]
+| driver | String | Yes | -
| The jdbc class name used to connect to the remote data source,<br/>
if you use MySQL the value is `com.mysql.cj.jdbc.Driver`.
[...]
+| user | String | No | -
| Connection instance user name
[...]
+| password | String | No | -
| Connection instance password
[...]
+| query | String | Yes | -
| Query statement
[...]
+| connection_check_timeout_sec | Int | No | 30
| The time in seconds to wait for the database operation used to
validate the connection to complete
[...]
+| partition_column | String | No | -
| The column name for parallelism's partition, only support numeric
type,Only support numeric type primary key, and only can config one column.
[...]
+| partition_lower_bound | BigDecimal | No | -
| The partition_column min value for scan, if not set SeaTunnel will
query database get min value.
[...]
+| partition_upper_bound | BigDecimal | No | -
| The partition_column max value for scan, if not set SeaTunnel will
query database get max value.
[...]
+| partition_num | Int | No | job
parallelism | The number of partition count, only support positive integer.
default value is job parallelism
[...]
+| fetch_size | Int | No | 0
| For queries that return a large number of objects,you can
configure<br/> the row fetch size used in the query toimprove performance
by<br/> reducing the number database hits required to satisfy the selection
criteria.<br/> Zero means use jdbc default value.
[...]
+| properties | Map | No | -
| Additional connection configuration parameters,when properties and
URL have the same parameters, the priority is determined by the <br/>specific
implementation of the driver. For example, in MySQL, properties take precedence
over the URL.
[...]
+| table_path | Int | No | 0
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle:
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1"
<br/>postgresql: "testdb.test_schema.table1"
[...]
+| table_list | Array | No | 0
| The list of tables to be read, you can use this configuration instead
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path =
"testdb.table2", query = "select * id, name from testdb.table2"}]```
[...]
+| where_condition | String | No | -
| Common row filter conditions for all tables/queries, must start with
`where`. for example `where id > 100`
[...]
+| split.size | Int | No | 8096
| The split size (number of rows) of table, captured tables are split
into multiple splits when read of table.
[...]
+| split.even-distribution.factor.lower-bound | Double | No | 0.05
| The lower bound of the chunk key distribution factor. This factor is
used to determine whether the table data is evenly distributed. If the
distribution factor is calculated to be greater than or equal to this lower
bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be
optimized for even distribution. Otherwise, if the distribution factor is less,
the table will be considered a [...]
+| split.even-distribution.factor.upper-bound | Double | No | 100
| The upper bound of the chunk key distribution factor. This factor is
used to determine whether the table data is evenly distributed. If the
distribution factor is calculated to be less than or equal to this upper bound
(i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be
optimized for even distribution. Otherwise, if the distribution factor is
greater, the table will be considered a [...]
+| split.sample-sharding.threshold | Int | No | 10000
| This configuration specifies the threshold of estimated shard count
to trigger the sample sharding strategy. When the distribution factor is
outside the bounds specified by
`chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding st [...]
+| split.inverse-sampling.rate | Int | No | 1000
| The inverse of the sampling rate used in the sample sharding
strategy. For example, if this value is set to 1000, it means a 1/1000 sampling
rate is applied during the sampling process. This option provides flexibility
in controlling the granularity of the sampling, thus affecting the final number
of shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is p [...]
+| common-options | | No | -
| Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
[...]
-> If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed in parallel according to the
concurrency of tasks.
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables.
SeaTunnel will use certain rules to split the data in the table, which will be
handed over to readers for reading. The number of readers is determined by the
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table
and get the Primary Key and Unique Index. If there are more than one column in
Primary Key and Unique Index, The first column which in the **supported split
data type** will be used to split data. For example, the table have Primary
Key(nn guid, name varchar), because `guid` id not in **supported split data
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be greater than or equal to this lower bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is less, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be less than or equal to this upper bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is greater, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger
the sample sharding strategy. When the distribution factor is outside the
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding strategy will be used. This can help to handle large datasets
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For
example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of
split through `split.size`
+
+How many splits do we need to split into, only support positive integer.
default value is job parallelism.
+
+## tips
+
+> If the table can not be split(for example, table have no Primary Key or
Unique Index, and `partition_column` is not set), it will run in single
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to
read multiple tables, use `table_list`.
## Task Example
@@ -92,7 +175,7 @@ Read external data source data through JDBC.
```
# Defining the runtime environment
env {
- parallelism = 2
+ parallelism = 4
job.mode = "BATCH"
}
@@ -115,13 +198,13 @@ sink {
}
```
-### Parallel:
+### parallel by partition_column
> Read your query table in parallel with the shard field you configured and
> the shard data You can do this if you want to read the whole table
```
env {
- parallelism = 10
+ parallelism = 4
job.mode = "BATCH"
}
source{
@@ -140,6 +223,33 @@ sink {
}
```
+### parallel by Primary Key or Unique Index
+
+> Configuring `table_path` will turn on auto split, you can configure
`split.*` to adjust the split strategy
+
+```
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+source {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "123456"
+ table_path = "test.public.AllDataType_1"
+ query = "select * from public.AllDataType_1"
+ split.size = 10000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
### Parallel Boundary:
> It is more efficient to specify the data within the upper and lower bounds
> of the query It is more efficient to read your data source according to the
> upper and lower boundaries you configured
@@ -163,3 +273,40 @@ source{
}
```
+### Multiple table read:
+
+***Configuring `table_list` will turn on auto split, you can configure
`split.*` to adjust the split strategy***
+
+```hocon
+env {
+ job.mode = "BATCH"
+ parallelism = 4
+}
+source {
+ Jdbc {
+ url="jdbc:postgresql://datasource01:5432/demo"
+ user="iDm82k6Q0Tq+wUprWnPsLQ=="
+ driver="org.postgresql.Driver"
+ password="iDm82k6Q0Tq+wUprWnPsLQ=="
+ "table_list"=[
+ {
+ "table_path"="demo.public.AllDataType_1"
+ },
+ {
+ "table_path"="demo.public.alldatatype"
+ }
+ ]
+ #where_condition= "where id > 100"
+ split.size = 10000
+ #split.even-distribution.factor.upper-bound = 100
+ #split.even-distribution.factor.lower-bound = 0.05
+ #split.sample-sharding.threshold = 1000
+ #split.inverse-sampling.rate = 1000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
diff --git a/docs/en/connector-v2/source/SqlServer.md
b/docs/en/connector-v2/source/SqlServer.md
index 01f49072b5..0cc6a0dacd 100644
--- a/docs/en/connector-v2/source/SqlServer.md
+++ b/docs/en/connector-v2/source/SqlServer.md
@@ -57,24 +57,93 @@ Read external data source data through JDBC.
## Source Options
-| name | type | required | default |
Description
|
-|------------------------------|--------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url | String | Yes | - | The URL
of the JDBC connection. Refer to a case:
jdbc:sqlserver://127.0.0.1:1434;database=TestDB
|
-| driver | String | Yes | - | The
jdbc class name used to connect to the remote data source,<br/> if you use
SQLserver the value is `com.microsoft.sqlserver.jdbc.SQLServerDriver`.
|
-| user | String | No | - |
Connection instance user name
|
-| password | String | No | - |
Connection instance password
|
-| query | String | Yes | - | Query
statement
|
-| connection_check_timeout_sec | Int | No | 30 | The
time in seconds to wait for the database operation used to validate the
connection to complete
|
-| partition_column | String | No | - | The
column name for parallelism's partition, only support numeric type.
|
-| partition_lower_bound | Long | No | - | The
partition_column min value for scan, if not set SeaTunnel will query database
get min value.
|
-| partition_upper_bound | Long | No | - | The
partition_column max value for scan, if not set SeaTunnel will query database
get max value.
|
-| partition_num | Int | No | job parallelism | The
number of partition count, only support positive integer. default value is job
parallelism
|
-| fetch_size | Int | No | 0 | For
queries that return a large number of objects,you can configure<br/> the row
fetch size used in the query toimprove performance by<br/> reducing the number
database hits required to satisfy the selection criteria.<br/> Zero means use
jdbc default value. |
-| common-options | | No | - | Source
plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
|
+| name | type | required | default
|
Description
[...]
+|--------------------------------------------|--------|----------|-----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
jdbc:sqlserver://127.0.0.1:1434;database=TestDB
[...]
+| driver | String | Yes | -
| The jdbc class name used to connect to the remote data source,<br/> if
you use SQLserver the value is `com.microsoft.sqlserver.jdbc.SQLServerDriver`.
[...]
+| user | String | No | -
| Connection instance user name
[...]
+| password | String | No | -
| Connection instance password
[...]
+| query | String | Yes | -
| Query statement
[...]
+| connection_check_timeout_sec | Int | No | 30
| The time in seconds to wait for the database operation used to validate
the connection to complete
[...]
+| partition_column | String | No | -
| The column name for parallelism's partition, only support numeric type.
[...]
+| partition_lower_bound | Long | No | -
| The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
[...]
+| partition_upper_bound | Long | No | -
| The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
[...]
+| partition_num | Int | No | job
parallelism | The number of partition count, only support positive integer.
default value is job parallelism
[...]
+| fetch_size | Int | No | 0
| For queries that return a large number of objects,you can configure<br/>
the row fetch size used in the query toimprove performance by<br/> reducing the
number database hits required to satisfy the selection criteria.<br/> Zero
means use jdbc default value.
[...]
+| properties | Map | No | -
| Additional connection configuration parameters,when properties and URL
have the same parameters, the priority is determined by the <br/>specific
implementation of the driver. For example, in MySQL, properties take precedence
over the URL.
[...]
+| table_path | Int | No | 0
| The path to the full path of table, you can use this configuration
instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle:
"test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1"
<br/>postgresql: "testdb.test_schema.table1"
[...]
+| table_list | Array | No | 0
| The list of tables to be read, you can use this configuration instead of
`table_path` example: ```[{ table_path = "testdb.table1"}, {table_path =
"testdb.table2", query = "select * id, name from testdb.table2"}]```
[...]
+| where_condition | String | No | -
| Common row filter conditions for all tables/queries, must start with
`where`. for example `where id > 100`
[...]
+| split.size | Int | No | 8096
| The split size (number of rows) of table, captured tables are split into
multiple splits when read of table.
[...]
+| split.even-distribution.factor.lower-bound | Double | No | 0.05
| The lower bound of the chunk key distribution factor. This factor is used
to determine whether the table data is evenly distributed. If the distribution
factor is calculated to be greater than or equal to this lower bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is less, the table
will be considered as un [...]
+| split.even-distribution.factor.upper-bound | Double | No | 100
| The upper bound of the chunk key distribution factor. This factor is used
to determine whether the table data is evenly distributed. If the distribution
factor is calculated to be less than or equal to this upper bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is greater, the table
will be considered as un [...]
+| split.sample-sharding.threshold | Int | No | 10000
| This configuration specifies the threshold of estimated shard count to
trigger the sample sharding strategy. When the distribution factor is outside
the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding strate [...]
+| split.inverse-sampling.rate | Int | No | 1000
| The inverse of the sampling rate used in the sample sharding strategy.
For example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is prefe [...]
+| common-options | | No | -
| Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
[...]
+
+## Parallel Reader
+
+The JDBC Source connector supports parallel reading of data from tables.
SeaTunnel will use certain rules to split the data in the table, which will be
handed over to readers for reading. The number of readers is determined by the
`parallelism` option.
+
+**Split Key Rules:**
+
+1. If `partition_column` is not null, It will be used to calculate split. The
column must in **Supported split data type**.
+2. If `partition_column` is null, seatunnel will read the schema from table
and get the Primary Key and Unique Index. If there are more than one column in
Primary Key and Unique Index, The first column which in the **supported split
data type** will be used to split data. For example, the table have Primary
Key(nn guid, name varchar), because `guid` id not in **supported split data
type**, so the column `name` will be used to split data.
+
+**Supported split data type:**
+* String
+* Number(int, bigint, decimal, ...)
+* Date
+
+### Options Related To Split
+
+#### split.size
+
+How many rows in one split, captured tables are split into multiple splits
when read of table.
+
+#### split.even-distribution.factor.lower-bound
+
+> Not recommended for use
+
+The lower bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be greater than or equal to this lower bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is less, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.even-distribution.factor.upper-bound
+
+> Not recommended for use
+
+The upper bound of the chunk key distribution factor. This factor is used to
determine whether the table data is evenly distributed. If the distribution
factor is calculated to be less than or equal to this upper bound (i.e.,
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for
even distribution. Otherwise, if the distribution factor is greater, the table
will be considered as unevenly distributed and the sampling-based sharding
strategy will be used if the estim [...]
+
+#### split.sample-sharding.threshold
+
+This configuration specifies the threshold of estimated shard count to trigger
the sample sharding strategy. When the distribution factor is outside the
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count
(calculated as approximate row count / chunk size) exceeds this threshold, the
sample sharding strategy will be used. This can help to handle large datasets
more efficiently. The default [...]
+
+#### split.inverse-sampling.rate
+
+The inverse of the sampling rate used in the sample sharding strategy. For
example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is preferred. The default value is 1000.
+
+#### partition_column [string]
+
+The column name for split data.
+
+#### partition_upper_bound [BigDecimal]
+
+The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
+
+#### partition_lower_bound [BigDecimal]
+
+The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
+
+#### partition_num [int]
+
+> Not recommended for use, The correct approach is to control the number of
split through `split.size`
+
+How many splits do we need to split into, only support positive integer.
default value is job parallelism.
## tips
-> If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed in parallel according to the
concurrency of tasks.
+> If the table can not be split(for example, table have no Primary Key or
Unique Index, and `partition_column` is not set), it will run in single
concurrency.
+>
+> Use `table_path` to replace `query` for single table reading. If you need to
read multiple tables, use `table_list`.
## Task Example
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 099d59d5c0..1562ac3b48 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -27,6 +27,9 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -50,6 +53,8 @@ import static java.lang.String.format;
*/
public interface JdbcDialect extends Serializable {
+ Logger log = LoggerFactory.getLogger(JdbcDialect.class.getName());
+
/**
* Get the name of jdbc dialect.
*
@@ -316,6 +321,7 @@ public interface JdbcDialect extends Serializable {
try (Statement stmt = connection.createStatement()) {
stmt.setFetchSize(1024);
+ log.info(String.format("Split Chunk, approximateRowCntStatement:
%s", sampleQuery));
try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
int count = 0;
List<Object> results = new ArrayList<>();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
index 2686a4e307..cc841149c9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
@@ -17,16 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+import lombok.extern.slf4j.Slf4j;
+
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+@Slf4j
public class SQLUtils {
public static Long countForSubquery(Connection connection, String
subQuerySQL)
throws SQLException {
String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) T",
subQuerySQL);
+ log.info("Split Chunk, countForSubquery: {}", sqlQuery);
try (Statement stmt = connection.createStatement()) {
try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
if (resultSet.next()) {
@@ -40,6 +44,7 @@ public class SQLUtils {
public static Long countForTable(Connection connection, String tablePath)
throws SQLException {
String sqlQuery = String.format("SELECT COUNT(*) FROM %s", tablePath);
+ log.info("Split Chunk, countForTable: {}", sqlQuery);
try (Statement stmt = connection.createStatement()) {
try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
if (resultSet.next()) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index f2ce15e31e..ec53dd0d6c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -28,6 +28,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.commons.lang3.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -41,6 +43,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+@Slf4j
public class MysqlDialect implements JdbcDialect {
public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
@@ -170,8 +173,11 @@ public class MysqlDialect implements JdbcDialect {
String.format("USE %s;",
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
String rowCountQuery =
String.format("SHOW TABLE STATUS LIKE '%s';",
tablePath.getTableName());
+
try (Statement stmt = connection.createStatement()) {
+ log.info("Split Chunk, approximateRowCntStatement: {}",
useDatabaseStatement);
stmt.execute(useDatabaseStatement);
+ log.info("Split Chunk, approximateRowCntStatement: {}",
rowCountQuery);
try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
throw new SQLException(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index a1a658ba06..9167fe07f4 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -28,6 +28,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.commons.lang3.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -38,6 +40,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+@Slf4j
public class OracleDialect implements JdbcDialect {
private static final int DEFAULT_ORACLE_FETCH_SIZE = 128;
@@ -188,7 +191,9 @@ public class OracleDialect implements JdbcDialect {
tablePath.getSchemaName(),
tablePath.getTableName());
try (Statement stmt = connection.createStatement()) {
+ log.info("Split Chunk, approximateRowCntStatement: {}",
analyzeTable);
stmt.execute(analyzeTable);
+ log.info("Split Chunk, approximateRowCntStatement: {}",
rowCountQuery);
try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
if (!rs.next()) {
throw new SQLException(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 9a4076fe2a..e73a8f0bb6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -28,6 +28,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.commons.lang3.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -37,6 +39,7 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
+@Slf4j
public class PostgresDialect implements JdbcDialect {
public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
@@ -152,6 +155,7 @@ public class PostgresDialect implements JdbcDialect {
"SELECT reltuples FROM pg_class r WHERE relkind =
'r' AND relname = '%s';",
table.getTablePath().getTableName());
try (Statement stmt = connection.createStatement()) {
+ log.info("Split Chunk, approximateRowCntStatement: {}",
rowCountQuery);
try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
if (!rs.next()) {
throw new SQLException(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index f8a46a0637..729ba17949 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -28,6 +28,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.commons.lang3.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -38,6 +40,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+@Slf4j
public class SqlServerDialect implements JdbcDialect {
public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
@@ -165,6 +168,7 @@ public class SqlServerDialect implements JdbcDialect {
String.format(
"USE %s;",
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
+ log.info("Split Chunk, approximateRowCntStatement: {}",
useDatabaseStatement);
stmt.execute(useDatabaseStatement);
}
String rowCountQuery =
@@ -172,6 +176,7 @@ public class SqlServerDialect implements JdbcDialect {
"SELECT Total_Rows = SUM(st.row_count) FROM
sys"
+ ".dm_db_partition_stats st WHERE
object_name(object_id) = '%s' AND index_id < 2;",
tablePath.getTableName());
+ log.info("Split Chunk, approximateRowCntStatement: {}",
rowCountQuery);
try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
if (!rs.next()) {
throw new SQLException(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index ed7fac4d26..89ec64b817 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -221,6 +221,7 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
jdbcDialect.tableIdentifier(table.getTablePath()));
}
try (Statement stmt = getOrEstablishConnection().createStatement()) {
+ log.info("Split table, query min max: {}", sqlQuery);
try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
if (resultSet.next()) {
Object min = resultSet.getObject(1);
@@ -251,7 +252,7 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
"Partitioned column(%s) don't exist in the
table columns",
partitionColumn));
}
- if (!isEvenlySplitColumn(column)) {
+ if (!isSupportSplitColumn(column)) {
throw new JdbcConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
String.format("%s is not numeric/string type",
partitionColumn));
@@ -266,7 +267,7 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
if (pk != null) {
for (String pkField : pk.getColumnNames()) {
Column column = columnMap.get(pkField);
- if (isEvenlySplitColumn(column)) {
+ if (isSupportSplitColumn(column)) {
return Optional.of(
new SeaTunnelRowType(
new String[] {pkField},
@@ -290,7 +291,7 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
uniqueKey.getColumnNames()) {
String uniqueKeyColumnName =
uniqueKeyColumn.getColumnName();
Column column = columnMap.get(uniqueKeyColumnName);
- if (isEvenlySplitColumn(column)) {
+ if (isSupportSplitColumn(column)) {
return Optional.of(
new SeaTunnelRowType(
new String[] {uniqueKeyColumnName},
@@ -305,19 +306,19 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
return Optional.empty();
}
- protected boolean isEvenlySplitColumn(Column splitColumn) {
- return isEvenlySplitColumn(splitColumn.getDataType());
- }
-
- protected boolean isEvenlySplitColumn(SeaTunnelDataType columnType) {
+ protected boolean isSupportSplitColumn(Column splitColumn) {
+ SeaTunnelDataType<?> dataType = splitColumn.getDataType();
// currently, we only support these types.
- switch (columnType.getSqlType()) {
+ switch (dataType.getSqlType()) {
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
+ case DOUBLE:
+ case FLOAT:
case DECIMAL:
case STRING:
+ case DATE:
return true;
default:
return false;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index da158dca44..62cc173702 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ObjectUtils;
@@ -32,8 +33,11 @@ import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.math.BigDecimal;
+import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -97,7 +101,6 @@ public class DynamicChunkSplitter extends ChunkSplitter {
private List<ChunkRange> splitTableIntoChunks(
JdbcSourceTable table, String splitColumnName, SeaTunnelDataType
splitColumnType)
throws SQLException {
- TablePath tablePath = table.getTablePath();
Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
Object min = minMax.getLeft();
Object max = minMax.getRight();
@@ -107,6 +110,29 @@ public class DynamicChunkSplitter extends ChunkSplitter {
}
int chunkSize = config.getSplitSize();
+
+ switch (splitColumnType.getSqlType()) {
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ case STRING:
+ return evenlyColumnSplitChunks(table, splitColumnName, min,
max, chunkSize);
+ case DATE:
+ return dateColumnSplitChunks(table, splitColumnName, min, max,
chunkSize);
+ default:
+ throw CommonError.unsupportedDataType(
+ "JDBC", splitColumnType.getSqlType().toString(),
splitColumnName);
+ }
+ }
+
+ private List<ChunkRange> evenlyColumnSplitChunks(
+ JdbcSourceTable table, String splitColumnName, Object min, Object
max, int chunkSize)
+ throws SQLException {
+ TablePath tablePath = table.getTablePath();
double distributionFactorUpper =
config.getSplitEvenDistributionFactorUpperBound();
double distributionFactorLower =
config.getSplitEvenDistributionFactorLowerBound();
int sampleShardingThreshold = config.getSplitSampleShardingThreshold();
@@ -123,59 +149,54 @@ public class DynamicChunkSplitter extends ChunkSplitter {
distributionFactorLower,
sampleShardingThreshold);
- if (isEvenlySplitColumn(splitColumnType)) {
- long approximateRowCnt = queryApproximateRowCnt(table);
- double distributionFactor =
- calculateDistributionFactor(tablePath, min, max,
approximateRowCnt);
-
- boolean dataIsEvenlyDistributed =
- ObjectUtils.doubleCompare(distributionFactor,
distributionFactorLower) >= 0
- && ObjectUtils.doubleCompare(
- distributionFactor,
distributionFactorUpper)
- <= 0;
-
- if (dataIsEvenlyDistributed) {
- // the minimum dynamic chunk size is at least 1
- final int dynamicChunkSize = Math.max((int)
(distributionFactor * chunkSize), 1);
- return splitEvenlySizedChunks(
- tablePath, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
- } else {
- int shardCount = (int) (approximateRowCnt / chunkSize);
- int inverseSamplingRate = config.getSplitInverseSamplingRate();
- if (sampleShardingThreshold < shardCount) {
- // It is necessary to ensure that the number of data rows
sampled by the
- // sampling rate is greater than the number of shards.
- // Otherwise, if the sampling rate is too low, it may
result in an insufficient
- // number of data rows for the shards, leading to an
inadequate number of
- // shards.
- // Therefore, inverseSamplingRate should be less than
chunkSize
- if (inverseSamplingRate > chunkSize) {
- log.warn(
- "The inverseSamplingRate is {}, which is
greater than chunkSize {}, so we set inverseSamplingRate to chunkSize",
- inverseSamplingRate,
- chunkSize);
- inverseSamplingRate = chunkSize;
- }
- log.info(
- "Use sampling sharding for table {}, the sampling
rate is {}",
- tablePath,
- inverseSamplingRate);
- Object[] sample =
- jdbcDialect.sampleDataFromColumn(
- getOrEstablishConnection(),
- table,
- splitColumnName,
- inverseSamplingRate);
- log.info(
- "Sample data from table {} end, the sample size is
{}",
- tablePath,
- sample.length);
- return efficientShardingThroughSampling(
- tablePath, sample, approximateRowCnt, shardCount);
+ long approximateRowCnt = queryApproximateRowCnt(table);
+ double distributionFactor =
+ calculateDistributionFactor(tablePath, min, max,
approximateRowCnt);
+
+ boolean dataIsEvenlyDistributed =
+ ObjectUtils.doubleCompare(distributionFactor,
distributionFactorLower) >= 0
+ && ObjectUtils.doubleCompare(distributionFactor,
distributionFactorUpper)
+ <= 0;
+
+ if (dataIsEvenlyDistributed) {
+ // the minimum dynamic chunk size is at least 1
+ final int dynamicChunkSize = Math.max((int) (distributionFactor *
chunkSize), 1);
+ return splitEvenlySizedChunks(
+ tablePath, min, max, approximateRowCnt, chunkSize,
dynamicChunkSize);
+ } else {
+ int shardCount = (int) (approximateRowCnt / chunkSize);
+ int inverseSamplingRate = config.getSplitInverseSamplingRate();
+ if (sampleShardingThreshold < shardCount) {
+ // It is necessary to ensure that the number of data rows
sampled by the
+ // sampling rate is greater than the number of shards.
+ // Otherwise, if the sampling rate is too low, it may result
in an insufficient
+ // number of data rows for the shards, leading to an
inadequate number of
+ // shards.
+ // Therefore, inverseSamplingRate should be less than chunkSize
+ if (inverseSamplingRate > chunkSize) {
+ log.warn(
+ "The inverseSamplingRate is {}, which is greater
than chunkSize {}, so we set inverseSamplingRate to chunkSize",
+ inverseSamplingRate,
+ chunkSize);
+ inverseSamplingRate = chunkSize;
}
- return splitUnevenlySizedChunks(table, splitColumnName, min,
max, chunkSize);
+ log.info(
+ "Use sampling sharding for table {}, the sampling rate
is {}",
+ tablePath,
+ inverseSamplingRate);
+ Object[] sample =
+ jdbcDialect.sampleDataFromColumn(
+ getOrEstablishConnection(),
+ table,
+ splitColumnName,
+ inverseSamplingRate);
+ log.info(
+ "Sample data from table {} end, the sample size is {}",
+ tablePath,
+ sample.length);
+ return efficientShardingThroughSampling(
+ tablePath, sample, approximateRowCnt, shardCount);
}
- } else {
return splitUnevenlySizedChunks(table, splitColumnName, min, max,
chunkSize);
}
}
@@ -309,6 +330,71 @@ public class DynamicChunkSplitter extends ChunkSplitter {
return splits;
}
+ /**
+ * split by date type column
+ *
+ * @param table
+ * @param splitColumnName
+ * @param min
+ * @param max
+ * @param chunkSize
+ * @return
+ * @throws SQLException
+ */
+ private List<ChunkRange> dateColumnSplitChunks(
+ JdbcSourceTable table, String splitColumnName, Object min, Object
max, int chunkSize)
+ throws SQLException {
+ log.info("Use date chunks for table {}", table.getTablePath());
+ final List<ChunkRange> splits = new ArrayList<>();
+ Date sqlDateMin = null;
+ Date sqlDateMax = null;
+ if (min instanceof Date) {
+ sqlDateMin = (Date) min;
+ sqlDateMax = (Date) max;
+ } else if (min instanceof Timestamp) {
+ sqlDateMin = new Date(((Timestamp) min).getTime());
+ sqlDateMax = new Date(((Timestamp) max).getTime());
+ }
+ List<LocalDate> dateRange =
+ getDateRange(sqlDateMin.toLocalDate(),
sqlDateMax.toLocalDate());
+ if (dateRange.size() > 20 * 365) {
+ // TODO: If dateRange granter than 20 year, need get the real date
in the table
+ }
+
+ Long rowCnt = queryApproximateRowCnt(table);
+ int step = 1;
+ if (rowCnt / dateRange.size() < chunkSize) {
+ int splitNum = (int) (rowCnt / chunkSize) + 1;
+ step = dateRange.size() / splitNum;
+ }
+
+ for (int i = 0; i < dateRange.size(); i = i + step) {
+ if (i == 0) {
+ splits.add(ChunkRange.of(null, dateRange.get(i)));
+ } else {
+ splits.add(ChunkRange.of(dateRange.get(i - step),
dateRange.get(i)));
+ }
+
+ if ((i + step) >= dateRange.size()) {
+ splits.add(ChunkRange.of(dateRange.get(i), null));
+ }
+ }
+ return splits;
+ }
+
+ // obtaining date range
+ private static List<LocalDate> getDateRange(LocalDate startDate, LocalDate
endDate) {
+ List<LocalDate> dateRange = new ArrayList<>();
+
+ LocalDate currentDate = startDate;
+ while (!currentDate.isAfter(endDate)) {
+ dateRange.add(currentDate);
+ currentDate = currentDate.plusDays(1);
+ }
+
+ return dateRange;
+ }
+
private Object nextChunkEnd(
Object previousChunkEnd,
JdbcSourceTable table,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
index 555e79072e..d5f9260879 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ObjectUtils.java
@@ -30,6 +30,10 @@ public class ObjectUtils {
return Math.addExact((Integer) number, augend);
} else if (number instanceof Long) {
return Math.addExact((Long) number, augend);
+ } else if (number instanceof Float) {
+ return ((Float) number) + augend;
+ } else if (number instanceof Double) {
+ return ((Double) number) + augend;
} else if (number instanceof BigInteger) {
return ((BigInteger) number).add(BigInteger.valueOf(augend));
} else if (number instanceof BigDecimal) {
@@ -56,6 +60,12 @@ public class ObjectUtils {
} else if (minuend instanceof Long) {
return BigDecimal.valueOf((long) minuend)
.subtract(BigDecimal.valueOf((long) subtrahend));
+ } else if (minuend instanceof Float) {
+ return BigDecimal.valueOf((float) minuend)
+ .subtract(BigDecimal.valueOf((float) subtrahend));
+ } else if (minuend instanceof Double) {
+ return BigDecimal.valueOf((double) minuend)
+ .subtract(BigDecimal.valueOf((double) subtrahend));
} else if (minuend instanceof BigInteger) {
return new BigDecimal(
((BigInteger) minuend).subtract((BigInteger)
subtrahend).toString());
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
new file mode 100644
index 0000000000..eab16faa42
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.source.DynamicChunkSplitter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+public class JdbcMysqlSplitIT extends TestSuiteBase implements TestResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcMysqlSplitIT.class);
+
+ private static final String MYSQL_IMAGE = "mysql:latest";
+ private static final String MYSQL_CONTAINER_HOST = "mysql-e2e";
+ private static final String MYSQL_DATABASE = "auto";
+ private static final String MYSQL_TABLE = "split_test";
+
+ private static final String MYSQL_USERNAME = "root";
+ private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel";
+ private static final int MYSQL_PORT = 3312;
+
+ private MySQLContainer<?> mysql_container;
+
+ LocalDate currentDateOld = LocalDate.of(2024, 1, 18);
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE IF NOT EXISTS "
+ + MYSQL_TABLE
+ + "\n"
+ + "(\n"
+ + " `id` int NOT
NULL,\n"
+ + " `c_bit_1` bit(1)
DEFAULT NULL,\n"
+ + " `c_bit_8` bit(8)
DEFAULT NULL,\n"
+ + " `c_bit_16` bit(16)
DEFAULT NULL,\n"
+ + " `c_bit_32` bit(32)
DEFAULT NULL,\n"
+ + " `c_bit_64` bit(64)
DEFAULT NULL,\n"
+ + " `c_boolean` tinyint(1)
DEFAULT NULL,\n"
+ + " `c_tinyint` tinyint(4)
DEFAULT NULL,\n"
+ + " `c_tinyint_unsigned` tinyint(3) unsigned
DEFAULT NULL,\n"
+ + " `c_smallint` smallint(6)
DEFAULT NULL,\n"
+ + " `c_smallint_unsigned` smallint(5) unsigned
DEFAULT NULL,\n"
+ + " `c_mediumint` mediumint(9)
DEFAULT NULL,\n"
+ + " `c_mediumint_unsigned` mediumint(8) unsigned
DEFAULT NULL,\n"
+ + " `c_int` int(11)
DEFAULT NULL,\n"
+ + " `c_integer` int(11)
DEFAULT NULL,\n"
+ + " `c_bigint` bigint(20)
DEFAULT NULL,\n"
+ + " `c_bigint_unsigned` bigint(20) unsigned
DEFAULT NULL,\n"
+ + " `c_decimal` decimal(20, 0)
DEFAULT NULL,\n"
+ + " `c_decimal_unsigned` decimal(38, 10)
DEFAULT NULL,\n"
+ + " `c_float` float
DEFAULT NULL,\n"
+ + " `c_float_unsigned` float unsigned
DEFAULT NULL,\n"
+ + " `c_double` double
DEFAULT NULL,\n"
+ + " `c_double_unsigned` double unsigned
DEFAULT NULL,\n"
+ + " `c_char` char(1)
DEFAULT NULL,\n"
+ + " `c_tinytext` tinytext,\n"
+ + " `c_mediumtext` mediumtext,\n"
+ + " `c_text` text,\n"
+ + " `c_varchar` varchar(255)
DEFAULT NULL,\n"
+ + " `c_json` json
DEFAULT NULL,\n"
+ + " `c_longtext` longtext,\n"
+ + " `c_date` date
DEFAULT NULL,\n"
+ + " `c_datetime` datetime
DEFAULT NULL,\n"
+ + " `c_timestamp` timestamp NULL
DEFAULT NULL,\n"
+ + " `c_tinyblob` tinyblob,\n"
+ + " `c_mediumblob` mediumblob,\n"
+ + " `c_blob` blob,\n"
+ + " `c_longblob` longblob,\n"
+ + " `c_varbinary` varbinary(255)
DEFAULT NULL,\n"
+ + " `c_binary` binary(1)
DEFAULT NULL,\n"
+ + " `c_year` year(4)
DEFAULT NULL,\n"
+ + " `c_int_unsigned` int(10) unsigned
DEFAULT NULL,\n"
+ + " `c_integer_unsigned` int(10) unsigned
DEFAULT NULL,\n"
+ + " `c_bigint_30` BIGINT(40) unsigned
DEFAULT NULL,\n"
+ + " `c_decimal_unsigned_30` DECIMAL(30) unsigned
DEFAULT NULL,\n"
+ + " `c_decimal_30` DECIMAL(30)
DEFAULT NULL,\n"
+ + " PRIMARY KEY (`id`)\n"
+ + ");";
+
+ void initContainer() throws ClassNotFoundException {
+ // ============= mysql
+ DockerImageName imageName = DockerImageName.parse(MYSQL_IMAGE);
+ mysql_container =
+ new MySQLContainer<>(imageName)
+ .withUsername(MYSQL_USERNAME)
+ .withPassword(MYSQL_PASSWORD)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_CONTAINER_HOST)
+ .withExposedPorts(MYSQL_PORT)
+ .waitingFor(Wait.forHealthcheck())
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MYSQL_IMAGE)));
+ mysql_container.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", MYSQL_PORT, 3306)));
+
+ Startables.deepStart(Stream.of(mysql_container)).join();
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ initContainer();
+ given().await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeJdbcTable);
+ }
+
+ private void initializeJdbcTable() {
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+ String insertSql = insertTable(MYSQL_DATABASE, MYSQL_TABLE,
fieldNames);
+ insertTestData(insertSql, testDataSet.getRight());
+ }
+
+ public String insertTable(String schema, String table, String... fields) {
+ String columns =
+
Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(",
"));
+ String placeholders = Arrays.stream(fields).map(f ->
"?").collect(Collectors.joining(", "));
+
+ return "INSERT INTO "
+ + schema
+ + "."
+ + table
+ + " ("
+ + columns
+ + " )"
+ + " VALUES ("
+ + placeholders
+ + ")";
+ }
+
+ protected void insertTestData(String insertSql, List<SeaTunnelRow> rows) {
+ try (Connection connection = getJdbcConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(insertSql)) {
+
+ preparedStatement.execute(CREATE_SQL);
+ for (SeaTunnelRow row : rows) {
+ for (int index = 0; index < row.getArity(); index++) {
+ preparedStatement.setObject(index + 1,
row.getField(index));
+ }
+ preparedStatement.addBatch();
+ }
+
+ preparedStatement.executeBatch();
+
+ // ANALYZE TABLE
+ preparedStatement.execute("ANALYZE TABLE " + MYSQL_DATABASE + "."
+ MYSQL_TABLE);
+
+ } catch (Exception exception) {
+ LOG.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+ }
+ }
+
+ public String quoteIdentifier(String field) {
+ return "`" + field + "`";
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ mysql_container.getJdbcUrl(),
+ mysql_container.getUsername(),
+ mysql_container.getPassword());
+ }
+
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames =
+ new String[] {
+ "id",
+ "c_bit_1",
+ "c_bit_8",
+ "c_bit_16",
+ "c_bit_32",
+ "c_bit_64",
+ "c_boolean",
+ "c_tinyint",
+ "c_tinyint_unsigned",
+ "c_smallint",
+ "c_smallint_unsigned",
+ "c_mediumint",
+ "c_mediumint_unsigned",
+ "c_int",
+ "c_integer",
+ "c_year",
+ "c_int_unsigned",
+ "c_integer_unsigned",
+ "c_bigint",
+ "c_bigint_unsigned",
+ "c_decimal",
+ "c_decimal_unsigned",
+ "c_float",
+ "c_float_unsigned",
+ "c_double",
+ "c_double_unsigned",
+ "c_char",
+ "c_tinytext",
+ "c_mediumtext",
+ "c_text",
+ "c_varchar",
+ "c_json",
+ "c_longtext",
+ "c_date",
+ "c_datetime",
+ "c_timestamp",
+ "c_tinyblob",
+ "c_mediumblob",
+ "c_blob",
+ "c_longblob",
+ "c_varbinary",
+ "c_binary",
+ "c_bigint_30",
+ "c_decimal_unsigned_30",
+ "c_decimal_30",
+ };
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ BigDecimal bigintValue = new BigDecimal("2844674407371055000");
+ BigDecimal decimalValue = new
BigDecimal("999999999999999999999999999899");
+ LocalDate currentDate = LocalDate.of(2024, 1, 17);
+
+ for (int i = 0; i < 100; i++) {
+ currentDate = currentDate.plusDays(1);
+ byte byteArr = Integer.valueOf(i).byteValue();
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i,
+ i % 2 == 0 ? (byte) 1 : (byte) 0,
+ new byte[] {byteArr},
+ new byte[] {byteArr, byteArr},
+ new byte[] {byteArr, byteArr, byteArr,
byteArr},
+ new byte[] {
+ byteArr, byteArr, byteArr, byteArr,
byteArr, byteArr, byteArr,
+ byteArr
+ },
+ i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ Long.parseLong(i + ""),
+ Long.parseLong(i + ""),
+ Long.parseLong(i + ""),
+ BigDecimal.valueOf(i, 0),
+ BigDecimal.valueOf(i, 0),
+ BigDecimal.valueOf(i * 10000000000L, 10),
+ Float.parseFloat(i + ".1"),
+ Float.parseFloat(i + ".1"),
+ Double.parseDouble(i + ".1"),
+ Double.parseDouble(i + ".1"),
+ "f",
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("{\"aa\":\"bb_%s\"}", i),
+ String.format("f1_%s", i),
+ Date.valueOf(currentDate),
+ Timestamp.valueOf(LocalDateTime.now()),
+ new Timestamp(System.currentTimeMillis()),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "f".getBytes(),
+ bigintValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ static JdbcUrlUtil.UrlInfo mysqlUrlInfo =
+ JdbcUrlUtil.getUrlInfo(
+
String.format("jdbc:mysql://localhost:%s/auto?useSSL=false", MYSQL_PORT));
+
+ @Test
+ public void testSplit() throws Exception {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("url", mysqlUrlInfo.getUrlWithDatabase().get());
+ configMap.put("driver", "com.mysql.cj.jdbc.Driver");
+ configMap.put("user", MYSQL_USERNAME);
+ configMap.put("password", MYSQL_PASSWORD);
+ configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE);
+ configMap.put("split.size", "10");
+ DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);
+
+ TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE);
+ MySqlCatalog mySqlCatalog =
+ new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD,
mysqlUrlInfo);
+ mySqlCatalog.open();
+ Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql));
+ CatalogTable table = mySqlCatalog.getTable(tablePathMySql);
+
+ JdbcSourceTable jdbcSourceTable =
+ JdbcSourceTable.builder()
+ .tablePath(TablePath.of(MYSQL_DATABASE, MYSQL_TABLE))
+ .catalogTable(table)
+ .build();
+ Collection<JdbcSourceSplit> jdbcSourceSplits =
splitter.generateSplits(jdbcSourceTable);
+ Assertions.assertEquals(10, jdbcSourceSplits.size());
+ JdbcSourceSplit[] splitArray = jdbcSourceSplits.toArray(new
JdbcSourceSplit[0]);
+ Assertions.assertEquals("id", splitArray[0].getSplitKeyName());
+ assertNumSplit(splitArray, "");
+
+ // use tinyint column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_tinyint", 10);
+ assertNumSplit(splitArray, "");
+
+ // use tinyint_unsigned column to split
+ splitArray = getCheckedSplitArray(configMap, table,
"c_tinyint_unsigned", 10);
+ configMap.put("partition_column", "c_tinyint_unsigned");
+ assertNumSplit(splitArray, "");
+
+ // use smallint column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_smallint", 10);
+ configMap.put("partition_column", "c_smallint");
+ assertNumSplit(splitArray, "");
+
+ // use smallint_unsigned column to split
+ splitArray = getCheckedSplitArray(configMap, table,
"c_smallint_unsigned", 10);
+ configMap.put("partition_column", "c_smallint_unsigned");
+ assertNumSplit(splitArray, "");
+
+ // use int column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_int", 10);
+ configMap.put("partition_column", "c_int");
+ assertNumSplit(splitArray, "");
+
+ // use int column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_integer", 10);
+ configMap.put("partition_column", "c_integer");
+ assertNumSplit(splitArray, "");
+
+ // use int_unsigned column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_int_unsigned",
10);
+ configMap.put("partition_column", "c_int_unsigned");
+ assertNumSplit(splitArray, "");
+
+ // use integer_unsigned column to split
+ splitArray = getCheckedSplitArray(configMap, table,
"c_integer_unsigned", 10);
+ configMap.put("partition_column", "c_integer_unsigned");
+ assertNumSplit(splitArray, "");
+
+ // use int column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_mediumint", 10);
+ configMap.put("partition_column", "c_mediumint");
+ assertNumSplit(splitArray, "");
+
+ // use int column to split
+ splitArray = getCheckedSplitArray(configMap, table,
"c_mediumint_unsigned", 10);
+ configMap.put("partition_column", "c_mediumint_unsigned");
+ assertNumSplit(splitArray, "");
+
+ // use bigint column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_bigint", 10);
+ configMap.put("partition_column", "c_bigint");
+ assertNumSplit(splitArray, "");
+
+ // use bigint_unsigned column to split
+ splitArray = getCheckedSplitArray(configMap, table,
"c_bigint_unsigned", 10);
+ configMap.put("partition_column", "c_bigint_unsigned");
+ assertNumSplit(splitArray, "");
+
+ // use decimal column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_decimal", 10);
+ configMap.put("partition_column", "c_decimal");
+ assertNumSplit(splitArray, "");
+
+ // use decimal_unsigned column to split
+ splitArray = getCheckedSplitArray(configMap, table,
"c_decimal_unsigned", 10);
+ configMap.put("partition_column", "c_decimal_unsigned");
+ assertNumSplit(splitArray, ".0000000000");
+
+ // use double column to split
+ splitArray = getCheckedSplitArray(configMap, table, "c_double", 10);
+ configMap.put("partition_column", "c_double");
+ assertNumSplit(splitArray, ".1");
+
+ // use unsigned double column to split
+ splitArray = getCheckedSplitArray(configMap, table,
"c_double_unsigned", 10);
+ configMap.put("partition_column", "c_double_unsigned");
+ assertNumSplit(splitArray, ".1");
+
+ // use date column to split
+ configMap.put("partition_column", "c_date");
+ splitArray = getCheckedSplitArray(configMap, table, "c_date", 13);
+ configMap.put("partition_column", "c_date");
+ assertDateSplit(splitArray);
+
+ mySqlCatalog.close();
+ }
+
+ private JdbcSourceSplit[] getCheckedSplitArray(
+ Map<String, Object> configMap, CatalogTable table, String
splitKey, int splitNum)
+ throws SQLException {
+ configMap.put("partition_column", splitKey);
+ DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);
+
+ JdbcSourceTable jdbcSourceTable =
+ JdbcSourceTable.builder()
+ .tablePath(TablePath.of(MYSQL_DATABASE, MYSQL_TABLE))
+ .catalogTable(table)
+ .partitionColumn(splitKey)
+ .build();
+ Collection<JdbcSourceSplit> jdbcSourceSplits =
splitter.generateSplits(jdbcSourceTable);
+ Assertions.assertEquals(splitNum, jdbcSourceSplits.size());
+ JdbcSourceSplit[] splitArray = jdbcSourceSplits.toArray(new
JdbcSourceSplit[0]);
+ Assertions.assertEquals(splitKey, splitArray[0].getSplitKeyName());
+ return splitArray;
+ }
+
+ private void assertNumSplit(JdbcSourceSplit[] splitArray, String info) {
+ for (int i = 0; i < splitArray.length; i++) {
+ if (i == 0) {
+ Assertions.assertEquals(null, splitArray[i].getSplitStart());
+ Assertions.assertEquals("10" + info,
splitArray[i].getSplitEnd().toString());
+ continue;
+ }
+
+ if (i == splitArray.length - 1 && i != 0) {
+ Assertions.assertEquals(10 * i + info,
splitArray[i].getSplitStart().toString());
+ Assertions.assertEquals(null, splitArray[i].getSplitEnd());
+ continue;
+ }
+
+ Assertions.assertEquals(10 * i + info,
splitArray[i].getSplitStart().toString());
+ Assertions.assertEquals(10 * (i + 1) + info,
splitArray[i].getSplitEnd().toString());
+ }
+ }
+
+ private void assertDateSplit(JdbcSourceSplit[] splitArray) {
+ for (int i = 0; i < splitArray.length; i++) {
+ if (i == 0) {
+ Assertions.assertEquals(null, splitArray[i].getSplitStart());
+ Assertions.assertEquals(
+ currentDateOld.plusDays(i * 9).toString(),
+ splitArray[i].getSplitEnd().toString());
+ continue;
+ }
+
+ if (i == splitArray.length - 1 && i != 0) {
+ Assertions.assertEquals(
+ currentDateOld.plusDays((i - 1) * 9).toString(),
+ splitArray[i].getSplitStart().toString());
+ Assertions.assertEquals(null, splitArray[i].getSplitEnd());
+ continue;
+ }
+
+ Assertions.assertEquals(
+ currentDateOld.plusDays((i - 1) * 9).toString(),
+ splitArray[i].getSplitStart().toString());
+ Assertions.assertEquals(
+ currentDateOld.plusDays(i * 9).toString(),
+ splitArray[i].getSplitEnd().toString());
+ }
+ }
+
+ @NotNull private DynamicChunkSplitter getDynamicChunkSplitter(Map<String,
Object> configMap) {
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+ JdbcSourceConfig sourceConfig = JdbcSourceConfig.of(readonlyConfig);
+ DynamicChunkSplitter splitter = new DynamicChunkSplitter(sourceConfig);
+ return splitter;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (mysql_container != null) {
+ mysql_container.close();
+
dockerClient.removeContainerCmd(mysql_container.getContainerId()).exec();
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 20de2932a1..3b81f2a655 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -141,7 +141,6 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
.untilAsserted(
() -> {
JobMetrics jobMetrics =
coordinatorService.getJobMetrics(jobId3);
- log.info(jobMetrics.toJsonString());
assertTrue(40 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
assertTrue(40 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
assertTrue(