This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch 010801_cp_split_by_date
in repository https://gitbox.apache.org/repos/asf/seatunnel.git

commit 47cc9ade1c7ff5d35f3db4988a2778ccb3787b6f
Author: Eric <[email protected]>
AuthorDate: Mon Dec 18 11:27:08 2023 +0800

    Support Split By Date Type Column
---
 .../jdbc/internal/dialect/JdbcDialect.java         |   4 +
 .../seatunnel/jdbc/internal/dialect/SQLUtils.java  |   5 +
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |   7 +
 .../internal/dialect/oracle/OracleDialect.java     |   5 +
 .../internal/dialect/psql/PostgresDialect.java     |   4 +
 .../dialect/sqlserver/SqlServerDialect.java        |   5 +
 .../seatunnel/jdbc/source/ChunkSplitter.java       |  17 +-
 .../jdbc/source/DynamicChunkSplitter.java          | 191 +++++++++++++++------
 8 files changed, 177 insertions(+), 61 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 099d59d5c0..d204cf4fc4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.logging.Logger;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
@@ -50,6 +51,8 @@ import static java.lang.String.format;
  */
 public interface JdbcDialect extends Serializable {
 
+    Logger log = Logger.getLogger(JdbcDialect.class.getName());
+
     /**
      * Get the name of jdbc dialect.
      *
@@ -316,6 +319,7 @@ public interface JdbcDialect extends Serializable {
 
         try (Statement stmt = connection.createStatement()) {
             stmt.setFetchSize(1024);
+            log.info(String.format("Split Chunk, approximateRowCntStatement: 
%s", sampleQuery));
             try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
                 int count = 0;
                 List<Object> results = new ArrayList<>();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
index 2686a4e307..cc841149c9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java
@@ -17,16 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+@Slf4j
 public class SQLUtils {
 
     public static Long countForSubquery(Connection connection, String 
subQuerySQL)
             throws SQLException {
         String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) T", 
subQuerySQL);
+        log.info("Split Chunk, countForSubquery: {}", sqlQuery);
         try (Statement stmt = connection.createStatement()) {
             try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
                 if (resultSet.next()) {
@@ -40,6 +44,7 @@ public class SQLUtils {
 
     public static Long countForTable(Connection connection, String tablePath) 
throws SQLException {
         String sqlQuery = String.format("SELECT COUNT(*) FROM %s", tablePath);
+        log.info("Split Chunk, countForTable: {}", sqlQuery);
         try (Statement stmt = connection.createStatement()) {
             try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
                 if (resultSet.next()) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index f2ce15e31e..58573c7d08 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -28,6 +28,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import com.mysql.cj.MysqlType;
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -41,6 +44,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class MysqlDialect implements JdbcDialect {
     public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
 
@@ -170,8 +174,11 @@ public class MysqlDialect implements JdbcDialect {
                     String.format("USE %s;", 
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
             String rowCountQuery =
                     String.format("SHOW TABLE STATUS LIKE '%s';", 
tablePath.getTableName());
+
             try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
useDatabaseStatement);
                 stmt.execute(useDatabaseStatement);
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index a1a658ba06..9167fe07f4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -28,6 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -38,6 +40,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class OracleDialect implements JdbcDialect {
 
     private static final int DEFAULT_ORACLE_FETCH_SIZE = 128;
@@ -188,7 +191,9 @@ public class OracleDialect implements JdbcDialect {
                             tablePath.getSchemaName(), 
tablePath.getTableName());
 
             try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
analyzeTable);
                 stmt.execute(analyzeTable);
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next()) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 9a4076fe2a..e73a8f0bb6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -28,6 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -37,6 +39,7 @@ import java.util.Arrays;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class PostgresDialect implements JdbcDialect {
 
     public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;
@@ -152,6 +155,7 @@ public class PostgresDialect implements JdbcDialect {
                             "SELECT reltuples FROM pg_class r WHERE relkind = 
'r' AND relname = '%s';",
                             table.getTablePath().getTableName());
             try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next()) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index f8a46a0637..729ba17949 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -28,6 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -38,6 +40,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class SqlServerDialect implements JdbcDialect {
 
     public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
@@ -165,6 +168,7 @@ public class SqlServerDialect implements JdbcDialect {
                             String.format(
                                     "USE %s;",
                                     
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
+                    log.info("Split Chunk, approximateRowCntStatement: {}", 
useDatabaseStatement);
                     stmt.execute(useDatabaseStatement);
                 }
                 String rowCountQuery =
@@ -172,6 +176,7 @@ public class SqlServerDialect implements JdbcDialect {
                                 "SELECT Total_Rows = SUM(st.row_count) FROM 
sys"
                                         + ".dm_db_partition_stats st WHERE 
object_name(object_id) = '%s' AND index_id < 2;",
                                 tablePath.getTableName());
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next()) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index ed7fac4d26..7a67514fc7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -221,6 +221,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
                             jdbcDialect.tableIdentifier(table.getTablePath()));
         }
         try (Statement stmt = getOrEstablishConnection().createStatement()) {
+            log.info("Split table, query min max: {}", sqlQuery);
             try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
                 if (resultSet.next()) {
                     Object min = resultSet.getObject(1);
@@ -251,7 +252,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
                                 "Partitioned column(%s) don't exist in the 
table columns",
                                 partitionColumn));
             }
-            if (!isEvenlySplitColumn(column)) {
+            if (!isSupportSplitColumn(column)) {
                 throw new JdbcConnectorException(
                         CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                         String.format("%s is not numeric/string type", 
partitionColumn));
@@ -266,7 +267,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
         if (pk != null) {
             for (String pkField : pk.getColumnNames()) {
                 Column column = columnMap.get(pkField);
-                if (isEvenlySplitColumn(column)) {
+                if (isSupportSplitColumn(column)) {
                     return Optional.of(
                             new SeaTunnelRowType(
                                     new String[] {pkField},
@@ -290,7 +291,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
                             uniqueKey.getColumnNames()) {
                         String uniqueKeyColumnName = 
uniqueKeyColumn.getColumnName();
                         Column column = columnMap.get(uniqueKeyColumnName);
-                        if (isEvenlySplitColumn(column)) {
+                        if (isSupportSplitColumn(column)) {
                             return Optional.of(
                                     new SeaTunnelRowType(
                                             new String[] {uniqueKeyColumnName},
@@ -305,19 +306,17 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
         return Optional.empty();
     }
 
-    protected boolean isEvenlySplitColumn(Column splitColumn) {
-        return isEvenlySplitColumn(splitColumn.getDataType());
-    }
-
-    protected boolean isEvenlySplitColumn(SeaTunnelDataType columnType) {
+    protected boolean isSupportSplitColumn(Column splitColumn) {
+        SeaTunnelDataType<?> dataType = splitColumn.getDataType();
         // currently, we only support these types.
-        switch (columnType.getSqlType()) {
+        switch (dataType.getSqlType()) {
             case TINYINT:
             case SMALLINT:
             case INT:
             case BIGINT:
             case DECIMAL:
             case STRING:
+            case DATE:
                 return true;
             default:
                 return false;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index da158dca44..e1224a7fa1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ObjectUtils;
 
 import org.apache.commons.lang3.StringUtils;
@@ -32,8 +34,11 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -97,7 +102,6 @@ public class DynamicChunkSplitter extends ChunkSplitter {
     private List<ChunkRange> splitTableIntoChunks(
             JdbcSourceTable table, String splitColumnName, SeaTunnelDataType 
splitColumnType)
             throws SQLException {
-        TablePath tablePath = table.getTablePath();
         Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
         Object min = minMax.getLeft();
         Object max = minMax.getRight();
@@ -107,6 +111,29 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         }
 
         int chunkSize = config.getSplitSize();
+
+        switch (splitColumnType.getSqlType()) {
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case DECIMAL:
+            case STRING:
+                return evenlyColumnSplitChunks(table, splitColumnName, min, 
max, chunkSize);
+            case DATE:
+                return dateColumnSplitChunks(table, splitColumnName, min, max, 
chunkSize);
+            default:
+                throw new JdbcConnectorException(
+                        CommonErrorCode.ILLEGAL_ARGUMENT,
+                        String.format(
+                                "%s is not numeric/string type", 
splitColumnType.getSqlType()));
+        }
+    }
+
+    private List<ChunkRange> evenlyColumnSplitChunks(
+            JdbcSourceTable table, String splitColumnName, Object min, Object 
max, int chunkSize)
+            throws SQLException {
+        TablePath tablePath = table.getTablePath();
         double distributionFactorUpper = 
config.getSplitEvenDistributionFactorUpperBound();
         double distributionFactorLower = 
config.getSplitEvenDistributionFactorLowerBound();
         int sampleShardingThreshold = config.getSplitSampleShardingThreshold();
@@ -123,59 +150,54 @@ public class DynamicChunkSplitter extends ChunkSplitter {
                 distributionFactorLower,
                 sampleShardingThreshold);
 
-        if (isEvenlySplitColumn(splitColumnType)) {
-            long approximateRowCnt = queryApproximateRowCnt(table);
-            double distributionFactor =
-                    calculateDistributionFactor(tablePath, min, max, 
approximateRowCnt);
-
-            boolean dataIsEvenlyDistributed =
-                    ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorLower) >= 0
-                            && ObjectUtils.doubleCompare(
-                                            distributionFactor, 
distributionFactorUpper)
-                                    <= 0;
-
-            if (dataIsEvenlyDistributed) {
-                // the minimum dynamic chunk size is at least 1
-                final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
-                return splitEvenlySizedChunks(
-                        tablePath, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
-            } else {
-                int shardCount = (int) (approximateRowCnt / chunkSize);
-                int inverseSamplingRate = config.getSplitInverseSamplingRate();
-                if (sampleShardingThreshold < shardCount) {
-                    // It is necessary to ensure that the number of data rows 
sampled by the
-                    // sampling rate is greater than the number of shards.
-                    // Otherwise, if the sampling rate is too low, it may 
result in an insufficient
-                    // number of data rows for the shards, leading to an 
inadequate number of
-                    // shards.
-                    // Therefore, inverseSamplingRate should be less than 
chunkSize
-                    if (inverseSamplingRate > chunkSize) {
-                        log.warn(
-                                "The inverseSamplingRate is {}, which is 
greater than chunkSize {}, so we set inverseSamplingRate to chunkSize",
-                                inverseSamplingRate,
-                                chunkSize);
-                        inverseSamplingRate = chunkSize;
-                    }
-                    log.info(
-                            "Use sampling sharding for table {}, the sampling 
rate is {}",
-                            tablePath,
-                            inverseSamplingRate);
-                    Object[] sample =
-                            jdbcDialect.sampleDataFromColumn(
-                                    getOrEstablishConnection(),
-                                    table,
-                                    splitColumnName,
-                                    inverseSamplingRate);
-                    log.info(
-                            "Sample data from table {} end, the sample size is 
{}",
-                            tablePath,
-                            sample.length);
-                    return efficientShardingThroughSampling(
-                            tablePath, sample, approximateRowCnt, shardCount);
+        long approximateRowCnt = queryApproximateRowCnt(table);
+        double distributionFactor =
+                calculateDistributionFactor(tablePath, min, max, 
approximateRowCnt);
+
+        boolean dataIsEvenlyDistributed =
+                ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorLower) >= 0
+                        && ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorUpper)
+                                <= 0;
+
+        if (dataIsEvenlyDistributed) {
+            // the minimum dynamic chunk size is at least 1
+            final int dynamicChunkSize = Math.max((int) (distributionFactor * 
chunkSize), 1);
+            return splitEvenlySizedChunks(
+                    tablePath, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
+        } else {
+            int shardCount = (int) (approximateRowCnt / chunkSize);
+            int inverseSamplingRate = config.getSplitInverseSamplingRate();
+            if (sampleShardingThreshold < shardCount) {
+                // It is necessary to ensure that the number of data rows 
sampled by the
+                // sampling rate is greater than the number of shards.
+                // Otherwise, if the sampling rate is too low, it may result 
in an insufficient
+                // number of data rows for the shards, leading to an 
inadequate number of
+                // shards.
+                // Therefore, inverseSamplingRate should be less than chunkSize
+                if (inverseSamplingRate > chunkSize) {
+                    log.warn(
+                            "The inverseSamplingRate is {}, which is greater 
than chunkSize {}, so we set inverseSamplingRate to chunkSize",
+                            inverseSamplingRate,
+                            chunkSize);
+                    inverseSamplingRate = chunkSize;
                 }
-                return splitUnevenlySizedChunks(table, splitColumnName, min, 
max, chunkSize);
+                log.info(
+                        "Use sampling sharding for table {}, the sampling rate 
is {}",
+                        tablePath,
+                        inverseSamplingRate);
+                Object[] sample =
+                        jdbcDialect.sampleDataFromColumn(
+                                getOrEstablishConnection(),
+                                table,
+                                splitColumnName,
+                                inverseSamplingRate);
+                log.info(
+                        "Sample data from table {} end, the sample size is {}",
+                        tablePath,
+                        sample.length);
+                return efficientShardingThroughSampling(
+                        tablePath, sample, approximateRowCnt, shardCount);
             }
-        } else {
             return splitUnevenlySizedChunks(table, splitColumnName, min, max, 
chunkSize);
         }
     }
@@ -309,6 +331,71 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         return splits;
     }
 
+    /**
+     * split by date type column
+     *
+     * @param table
+     * @param splitColumnName
+     * @param min
+     * @param max
+     * @param chunkSize
+     * @return
+     * @throws SQLException
+     */
+    private List<ChunkRange> dateColumnSplitChunks(
+            JdbcSourceTable table, String splitColumnName, Object min, Object 
max, int chunkSize)
+            throws SQLException {
+        log.info("Use date chunks for table {}", table.getTablePath());
+        final List<ChunkRange> splits = new ArrayList<>();
+        Date sqlDateMin = null;
+        Date sqlDateMax = null;
+        if (min instanceof Date) {
+            sqlDateMin = (Date) min;
+            sqlDateMax = (Date) max;
+        } else if (min instanceof Timestamp) {
+            sqlDateMin = new Date(((Timestamp) min).getTime());
+            sqlDateMax = new Date(((Timestamp) max).getTime());
+        }
+        List<LocalDate> dateRange =
+                getDateRange(sqlDateMin.toLocalDate(), 
sqlDateMax.toLocalDate());
+        if (dateRange.size() > 20 * 365) {
+            // TODO: If dateRange granter than 20 year, need get the real date 
in the table
+        }
+
+        Long rowCnt = queryApproximateRowCnt(table);
+        int step = 1;
+        if (rowCnt / dateRange.size() < chunkSize) {
+            int splitNum = (int) (rowCnt / chunkSize) + 1;
+            step = dateRange.size() / splitNum;
+        }
+
+        for (int i = 0; i < dateRange.size(); i = i + step) {
+            if (i == 0) {
+                splits.add(ChunkRange.of(null, dateRange.get(i)));
+            } else {
+                splits.add(ChunkRange.of(dateRange.get(i - step), 
dateRange.get(i)));
+            }
+
+            if ((i + step) >= dateRange.size()) {
+                splits.add(ChunkRange.of(dateRange.get(i), null));
+            }
+        }
+        return splits;
+    }
+
+    // obtaining date range
+    private static List<LocalDate> getDateRange(LocalDate startDate, LocalDate 
endDate) {
+        List<LocalDate> dateRange = new ArrayList<>();
+
+        LocalDate currentDate = startDate;
+        while (!currentDate.isAfter(endDate)) {
+            dateRange.add(currentDate);
+            currentDate = currentDate.plusDays(1);
+        }
+
+        return dateRange;
+    }
+
     private Object nextChunkEnd(
             Object previousChunkEnd,
             JdbcSourceTable table,

Reply via email to