This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch 010801_cp_split_by_date in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit 47cc9ade1c7ff5d35f3db4988a2778ccb3787b6f Author: Eric <[email protected]> AuthorDate: Mon Dec 18 11:27:08 2023 +0800 Support Split By Date Type Column --- .../jdbc/internal/dialect/JdbcDialect.java | 4 + .../seatunnel/jdbc/internal/dialect/SQLUtils.java | 5 + .../jdbc/internal/dialect/mysql/MysqlDialect.java | 7 + .../internal/dialect/oracle/OracleDialect.java | 5 + .../internal/dialect/psql/PostgresDialect.java | 4 + .../dialect/sqlserver/SqlServerDialect.java | 5 + .../seatunnel/jdbc/source/ChunkSplitter.java | 17 +- .../jdbc/source/DynamicChunkSplitter.java | 191 +++++++++++++++------ 8 files changed, 177 insertions(+), 61 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 099d59d5c0..d204cf4fc4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.logging.Logger; import java.util.stream.Collectors; import static java.lang.String.format; @@ -50,6 +51,8 @@ import static java.lang.String.format; */ public interface JdbcDialect extends Serializable { + Logger log = Logger.getLogger(JdbcDialect.class.getName()); + /** * Get the name of jdbc dialect. * @@ -316,6 +319,7 @@ public interface JdbcDialect extends Serializable { try (Statement stmt = connection.createStatement()) { stmt.setFetchSize(1024); + log.info(String.format("Split Chunk, approximateRowCntStatement: %s", sampleQuery)); try (ResultSet rs = stmt.executeQuery(sampleQuery)) { int count = 0; List<Object> results = new ArrayList<>(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java index 2686a4e307..cc841149c9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/SQLUtils.java @@ -17,16 +17,20 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; +import lombok.extern.slf4j.Slf4j; + import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +@Slf4j public class SQLUtils { public static Long countForSubquery(Connection connection, String subQuerySQL) throws SQLException { String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) T", subQuerySQL); + log.info("Split Chunk, countForSubquery: {}", sqlQuery); try (Statement stmt = connection.createStatement()) { try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) { if (resultSet.next()) { @@ -40,6 +44,7 @@ public class SQLUtils { public static Long countForTable(Connection connection, String tablePath) throws SQLException { String sqlQuery = String.format("SELECT COUNT(*) FROM %s", tablePath); + log.info("Split Chunk, countForTable: {}", sqlQuery); try (Statement stmt = connection.createStatement()) { try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) { if (resultSet.next()) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index f2ce15e31e..58573c7d08 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -28,6 +28,9 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; import org.apache.commons.lang3.StringUtils; +import com.mysql.cj.MysqlType; +import lombok.extern.slf4j.Slf4j; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -41,6 +44,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +@Slf4j public class MysqlDialect implements JdbcDialect { public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); @@ -170,8 +174,11 @@ public class MysqlDialect implements JdbcDialect { String.format("USE %s;", quoteDatabaseIdentifier(tablePath.getDatabaseName())); String rowCountQuery = String.format("SHOW TABLE STATUS LIKE '%s';", tablePath.getTableName()); + try (Statement stmt = connection.createStatement()) { + log.info("Split Chunk, approximateRowCntStatement: {}", useDatabaseStatement); stmt.execute(useDatabaseStatement); + log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { if (!rs.next() || rs.getMetaData().getColumnCount() < 5) { throw new SQLException( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index a1a658ba06..9167fe07f4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -28,6 +28,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -38,6 +40,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +@Slf4j public class OracleDialect implements JdbcDialect { private static final int DEFAULT_ORACLE_FETCH_SIZE = 128; @@ -188,7 +191,9 @@ public class OracleDialect implements JdbcDialect { tablePath.getSchemaName(), tablePath.getTableName()); try (Statement stmt = connection.createStatement()) { + log.info("Split Chunk, approximateRowCntStatement: {}", analyzeTable); stmt.execute(analyzeTable); + log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { if (!rs.next()) { throw new SQLException( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index 9a4076fe2a..e73a8f0bb6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -28,6 +28,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -37,6 +39,7 @@ import java.util.Arrays; import java.util.Optional; import java.util.stream.Collectors; +@Slf4j public class PostgresDialect implements JdbcDialect { public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128; @@ -152,6 +155,7 @@ public class PostgresDialect implements JdbcDialect { "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", table.getTablePath().getTableName()); try (Statement stmt = connection.createStatement()) { + log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { if (!rs.next()) { throw new SQLException( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java index f8a46a0637..729ba17949 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -28,6 +28,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -38,6 +40,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +@Slf4j public class SqlServerDialect implements JdbcDialect { public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); @@ -165,6 +168,7 @@ public class SqlServerDialect implements JdbcDialect { String.format( "USE %s;", quoteDatabaseIdentifier(tablePath.getDatabaseName())); + log.info("Split Chunk, approximateRowCntStatement: {}", useDatabaseStatement); stmt.execute(useDatabaseStatement); } String rowCountQuery = @@ -172,6 +176,7 @@ public class SqlServerDialect implements JdbcDialect { "SELECT Total_Rows = SUM(st.row_count) FROM sys" + ".dm_db_partition_stats st WHERE object_name(object_id) = '%s' AND index_id < 2;", tablePath.getTableName()); + log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { if (!rs.next()) { throw new SQLException( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java index ed7fac4d26..7a67514fc7 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java @@ -221,6 +221,7 @@ public abstract class ChunkSplitter implements AutoCloseable, Serializable { jdbcDialect.tableIdentifier(table.getTablePath())); } try (Statement stmt = getOrEstablishConnection().createStatement()) { + log.info("Split table, query min max: {}", sqlQuery); try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) { if (resultSet.next()) { Object min = resultSet.getObject(1); @@ -251,7 +252,7 @@ public abstract class ChunkSplitter implements AutoCloseable, Serializable { "Partitioned column(%s) don't exist in the table columns", partitionColumn)); } - if (!isEvenlySplitColumn(column)) { + if (!isSupportSplitColumn(column)) { throw new JdbcConnectorException( CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("%s is not numeric/string type", partitionColumn)); @@ -266,7 +267,7 @@ public abstract class ChunkSplitter implements AutoCloseable, Serializable { if (pk != null) { for (String pkField : pk.getColumnNames()) { Column column = columnMap.get(pkField); - if (isEvenlySplitColumn(column)) { + if (isSupportSplitColumn(column)) { return Optional.of( new SeaTunnelRowType( new String[] {pkField}, @@ -290,7 +291,7 @@ public abstract class ChunkSplitter implements AutoCloseable, Serializable { uniqueKey.getColumnNames()) { String uniqueKeyColumnName = uniqueKeyColumn.getColumnName(); Column column = columnMap.get(uniqueKeyColumnName); - if (isEvenlySplitColumn(column)) { + if (isSupportSplitColumn(column)) { return Optional.of( new SeaTunnelRowType( new String[] {uniqueKeyColumnName}, @@ -305,19 +306,17 @@ public abstract class ChunkSplitter implements AutoCloseable, Serializable { return Optional.empty(); } - protected boolean isEvenlySplitColumn(Column splitColumn) { - return isEvenlySplitColumn(splitColumn.getDataType()); - } - - protected boolean isEvenlySplitColumn(SeaTunnelDataType columnType) { + protected boolean isSupportSplitColumn(Column splitColumn) { + SeaTunnelDataType<?> dataType = splitColumn.getDataType(); // currently, we only support these types. - switch (columnType.getSqlType()) { + switch (dataType.getSqlType()) { case TINYINT: case SMALLINT: case INT: case BIGINT: case DECIMAL: case STRING: + case DATE: return true; default: return false; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java index da158dca44..e1224a7fa1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java @@ -20,7 +20,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ObjectUtils; import org.apache.commons.lang3.StringUtils; @@ -32,8 +34,11 @@ import lombok.extern.slf4j.Slf4j; import java.io.Serializable; import java.math.BigDecimal; +import java.sql.Date; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -97,7 +102,6 @@ public class DynamicChunkSplitter extends ChunkSplitter { private List<ChunkRange> splitTableIntoChunks( JdbcSourceTable table, String splitColumnName, SeaTunnelDataType splitColumnType) throws SQLException { - TablePath tablePath = table.getTablePath(); Pair<Object, Object> minMax = queryMinMax(table, splitColumnName); Object min = minMax.getLeft(); Object max = minMax.getRight(); @@ -107,6 +111,29 @@ public class DynamicChunkSplitter extends ChunkSplitter { } int chunkSize = config.getSplitSize(); + + switch (splitColumnType.getSqlType()) { + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case DECIMAL: + case STRING: + return evenlyColumnSplitChunks(table, splitColumnName, min, max, chunkSize); + case DATE: + return dateColumnSplitChunks(table, splitColumnName, min, max, chunkSize); + default: + throw new JdbcConnectorException( + CommonErrorCode.ILLEGAL_ARGUMENT, + String.format( + "%s is not numeric/string type", splitColumnType.getSqlType())); + } + } + + private List<ChunkRange> evenlyColumnSplitChunks( + JdbcSourceTable table, String splitColumnName, Object min, Object max, int chunkSize) + throws SQLException { + TablePath tablePath = table.getTablePath(); double distributionFactorUpper = config.getSplitEvenDistributionFactorUpperBound(); double distributionFactorLower = config.getSplitEvenDistributionFactorLowerBound(); int sampleShardingThreshold = config.getSplitSampleShardingThreshold(); @@ -123,59 +150,54 @@ public class DynamicChunkSplitter extends ChunkSplitter { distributionFactorLower, sampleShardingThreshold); - if (isEvenlySplitColumn(splitColumnType)) { - long approximateRowCnt = queryApproximateRowCnt(table); - double distributionFactor = - calculateDistributionFactor(tablePath, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - ObjectUtils.doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && ObjectUtils.doubleCompare( - distributionFactor, distributionFactorUpper) - <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tablePath, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - int shardCount = (int) (approximateRowCnt / chunkSize); - int inverseSamplingRate = config.getSplitInverseSamplingRate(); - if (sampleShardingThreshold < shardCount) { - // It is necessary to ensure that the number of data rows sampled by the - // sampling rate is greater than the number of shards. - // Otherwise, if the sampling rate is too low, it may result in an insufficient - // number of data rows for the shards, leading to an inadequate number of - // shards. - // Therefore, inverseSamplingRate should be less than chunkSize - if (inverseSamplingRate > chunkSize) { - log.warn( - "The inverseSamplingRate is {}, which is greater than chunkSize {}, so we set inverseSamplingRate to chunkSize", - inverseSamplingRate, - chunkSize); - inverseSamplingRate = chunkSize; - } - log.info( - "Use sampling sharding for table {}, the sampling rate is {}", - tablePath, - inverseSamplingRate); - Object[] sample = - jdbcDialect.sampleDataFromColumn( - getOrEstablishConnection(), - table, - splitColumnName, - inverseSamplingRate); - log.info( - "Sample data from table {} end, the sample size is {}", - tablePath, - sample.length); - return efficientShardingThroughSampling( - tablePath, sample, approximateRowCnt, shardCount); + long approximateRowCnt = queryApproximateRowCnt(table); + double distributionFactor = + calculateDistributionFactor(tablePath, min, max, approximateRowCnt); + + boolean dataIsEvenlyDistributed = + ObjectUtils.doubleCompare(distributionFactor, distributionFactorLower) >= 0 + && ObjectUtils.doubleCompare(distributionFactor, distributionFactorUpper) + <= 0; + + if (dataIsEvenlyDistributed) { + // the minimum dynamic chunk size is at least 1 + final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); + return splitEvenlySizedChunks( + tablePath, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); + } else { + int shardCount = (int) (approximateRowCnt / chunkSize); + int inverseSamplingRate = config.getSplitInverseSamplingRate(); + if (sampleShardingThreshold < shardCount) { + // It is necessary to ensure that the number of data rows sampled by the + // sampling rate is greater than the number of shards. + // Otherwise, if the sampling rate is too low, it may result in an insufficient + // number of data rows for the shards, leading to an inadequate number of + // shards. + // Therefore, inverseSamplingRate should be less than chunkSize + if (inverseSamplingRate > chunkSize) { + log.warn( + "The inverseSamplingRate is {}, which is greater than chunkSize {}, so we set inverseSamplingRate to chunkSize", + inverseSamplingRate, + chunkSize); + inverseSamplingRate = chunkSize; } - return splitUnevenlySizedChunks(table, splitColumnName, min, max, chunkSize); + log.info( + "Use sampling sharding for table {}, the sampling rate is {}", + tablePath, + inverseSamplingRate); + Object[] sample = + jdbcDialect.sampleDataFromColumn( + getOrEstablishConnection(), + table, + splitColumnName, + inverseSamplingRate); + log.info( + "Sample data from table {} end, the sample size is {}", + tablePath, + sample.length); + return efficientShardingThroughSampling( + tablePath, sample, approximateRowCnt, shardCount); } - } else { return splitUnevenlySizedChunks(table, splitColumnName, min, max, chunkSize); } } @@ -309,6 +331,71 @@ public class DynamicChunkSplitter extends ChunkSplitter { return splits; } + /** + * split by date type column + * + * @param table + * @param splitColumnName + * @param min + * @param max + * @param chunkSize + * @return + * @throws SQLException + */ + private List<ChunkRange> dateColumnSplitChunks( + JdbcSourceTable table, String splitColumnName, Object min, Object max, int chunkSize) + throws SQLException { + log.info("Use date chunks for table {}", table.getTablePath()); + final List<ChunkRange> splits = new ArrayList<>(); + Date sqlDateMin = null; + Date sqlDateMax = null; + if (min instanceof Date) { + sqlDateMin = (Date) min; + sqlDateMax = (Date) max; + } else if (min instanceof Timestamp) { + sqlDateMin = new Date(((Timestamp) min).getTime()); + sqlDateMax = new Date(((Timestamp) max).getTime()); + } + List<LocalDate> dateRange = + getDateRange(sqlDateMin.toLocalDate(), sqlDateMax.toLocalDate()); + if (dateRange.size() > 20 * 365) { + // TODO: If dateRange granter than 20 year, need get the real date in the table + } + + Long rowCnt = queryApproximateRowCnt(table); + int step = 1; + if (rowCnt / dateRange.size() < chunkSize) { + int splitNum = (int) (rowCnt / chunkSize) + 1; + step = dateRange.size() / splitNum; + } + + for (int i = 0; i < dateRange.size(); i = i + step) { + if (i == 0) { + splits.add(ChunkRange.of(null, dateRange.get(i))); + } else { + splits.add(ChunkRange.of(dateRange.get(i - step), dateRange.get(i))); + } + + if ((i + step) >= dateRange.size()) { + splits.add(ChunkRange.of(dateRange.get(i), null)); + } + } + return splits; + } + + // obtaining date range + private static List<LocalDate> getDateRange(LocalDate startDate, LocalDate endDate) { + List<LocalDate> dateRange = new ArrayList<>(); + + LocalDate currentDate = startDate; + while (!currentDate.isAfter(endDate)) { + dateRange.add(currentDate); + currentDate = currentDate.plusDays(1); + } + + return dateRange; + } + private Object nextChunkEnd( Object previousChunkEnd, JdbcSourceTable table,
