This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 635c24e8b2 [Bugfix][JDBC、CDC] Fix Spliter Error in Case of Extensive 
Duplicate Data (#6026)
635c24e8b2 is described below

commit 635c24e8b28d22602ba29d9117d63c2fbdf4af2e
Author: ic4y <[email protected]>
AuthorDate: Thu Jan 18 13:40:18 2024 +0800

    [Bugfix][JDBC、CDC] Fix Spliter Error in Case of Extensive Duplicate Data 
(#6026)
---
 .../splitter/AbstractJdbcSourceChunkSplitter.java  |  26 ++-
 .../AbstractJdbcSourceChunkSplitterTest.java       | 232 +++++++++++++++++++++
 .../jdbc/source/DynamicChunkSplitter.java          |  30 ++-
 .../jdbc/source/DynamicChunkSplitterTest.java      | 191 +++++++++++++++++
 4 files changed, 461 insertions(+), 18 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index 151e003ecd..f124e5fc71 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -262,22 +262,32 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
 
         double approxSamplePerShard = (double) sampleData.length / shardCount;
 
+        Object lastEnd = null;
         if (approxSamplePerShard <= 1) {
-
             splits.add(ChunkRange.of(null, sampleData[0]));
-            for (int i = 0; i < sampleData.length - 1; i++) {
-                splits.add(ChunkRange.of(sampleData[i], sampleData[i + 1]));
+            lastEnd = sampleData[0];
+            for (int i = 1; i < sampleData.length; i++) {
+                // avoid split duplicate data
+                if (!sampleData[i].equals(lastEnd)) {
+                    splits.add(ChunkRange.of(lastEnd, sampleData[i]));
+                    lastEnd = sampleData[i];
+                }
             }
-            splits.add(ChunkRange.of(sampleData[sampleData.length - 1], null));
+
+            splits.add(ChunkRange.of(lastEnd, null));
+
         } else {
-            // Calculate the shard boundaries
             for (int i = 0; i < shardCount; i++) {
-                Object chunkStart = i == 0 ? null : sampleData[(int) (i * 
approxSamplePerShard)];
+                Object chunkStart = lastEnd;
                 Object chunkEnd =
-                        i < shardCount - 1
+                        (i < shardCount - 1)
                                 ? sampleData[(int) ((i + 1) * 
approxSamplePerShard)]
                                 : null;
-                splits.add(ChunkRange.of(chunkStart, chunkEnd));
+                // avoid split duplicate data
+                if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, 
chunkStart)) {
+                    splits.add(ChunkRange.of(chunkStart, chunkEnd));
+                    lastEnd = chunkEnd;
+                }
             }
         }
         return splits;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
new file mode 100644
index 0000000000..076bafae15
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbstractJdbcSourceChunkSplitterTest {
+
+    @Test
+    public void testEfficientShardingThroughSampling() throws 
NoSuchMethodException {
+
+        UtJdbcSourceChunkSplitter utJdbcSourceChunkSplitter = new 
UtJdbcSourceChunkSplitter();
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 2),
+                Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 1),
+                Arrays.asList(ChunkRange.of(null, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 10),
+                Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2}, 
1000, 10),
+                Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, 2), 
ChunkRange.of(2, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2}, 
1000, 1),
+                Arrays.asList(ChunkRange.of(null, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2}, 
1000, 2),
+                Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1}, 1000, 1),
+                Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1}, 1000, 2),
+                Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3}, 1000, 2),
+                Arrays.asList(ChunkRange.of(null, 2), ChunkRange.of(2, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3}, 1000, 1),
+                Arrays.asList(ChunkRange.of(null, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3}, 1000, 3),
+                Arrays.asList(
+                        ChunkRange.of(null, 1),
+                        ChunkRange.of(1, 2),
+                        ChunkRange.of(2, 3),
+                        ChunkRange.of(3, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3, 4, 5}, 1000, 3),
+                Arrays.asList(ChunkRange.of(null, 2), ChunkRange.of(2, 4), 
ChunkRange.of(4, null)));
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3, 4, 5}, 1000, 2),
+                Arrays.asList(ChunkRange.of(null, 3), ChunkRange.of(3, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 1),
+                Arrays.asList(ChunkRange.of(null, null)));
+
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 3),
+                Arrays.asList(ChunkRange.of(null, 3), ChunkRange.of(3, 5), 
ChunkRange.of(5, null)));
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 4),
+                Arrays.asList(
+                        ChunkRange.of(null, 2),
+                        ChunkRange.of(2, 4),
+                        ChunkRange.of(4, 5),
+                        ChunkRange.of(5, null)));
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 5),
+                Arrays.asList(
+                        ChunkRange.of(null, 2),
+                        ChunkRange.of(2, 3),
+                        ChunkRange.of(3, 4),
+                        ChunkRange.of(4, 5),
+                        ChunkRange.of(5, null)));
+        check(
+                utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+                        null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 6),
+                Arrays.asList(
+                        ChunkRange.of(null, 1),
+                        ChunkRange.of(1, 2),
+                        ChunkRange.of(2, 3),
+                        ChunkRange.of(3, 4),
+                        ChunkRange.of(4, 5),
+                        ChunkRange.of(5, 6),
+                        ChunkRange.of(6, null)));
+    }
+
+    private void check(List<ChunkRange> a, List<ChunkRange> b) {
+        checkRule(b);
+        assertEquals(a, b);
+    }
+
+    private void checkRule(List<ChunkRange> a) {
+        for (int i = 0; i < a.size(); i++) {
+            if (i == 0) {
+                assertNull(a.get(i).getChunkStart());
+            }
+            if (i == a.size() - 1) {
+                assertNull(a.get(i).getChunkEnd());
+            }
+            // current chunk start should be equal to previous chunk end
+            if (i > 0) {
+                assertEquals(a.get(i - 1).getChunkEnd(), 
a.get(i).getChunkStart());
+            }
+            if (i > 0 && i < a.size() - 1) {
+                // current chunk end should be greater than current chunk start
+                assertTrue((int) a.get(i).getChunkEnd() > (int) 
a.get(i).getChunkStart());
+            }
+        }
+    }
+
+    public static class UtJdbcSourceChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
+
+        public UtJdbcSourceChunkSplitter() {
+            super(null, null);
+        }
+
+        @Override
+        public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
+                throws SQLException {
+            return new Object[0];
+        }
+
+        @Override
+        public Object queryMin(
+                JdbcConnection jdbc, TableId tableId, String columnName, 
Object excludedLowerBound)
+                throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Object[] sampleDataFromColumn(
+                JdbcConnection jdbc, TableId tableId, String columnName, int 
samplingRate)
+                throws SQLException {
+            return new Object[0];
+        }
+
+        @Override
+        public Object queryNextChunkMax(
+                JdbcConnection jdbc,
+                TableId tableId,
+                String columnName,
+                int chunkSize,
+                Object includedLowerBound)
+                throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
+                throws SQLException {
+            return null;
+        }
+
+        @Override
+        public String buildSplitScanQuery(
+                TableId tableId,
+                SeaTunnelRowType splitKeyType,
+                boolean isFirstSplit,
+                boolean isLastSplit) {
+            return null;
+        }
+
+        @Override
+        public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
+            return null;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index 62cc173702..12f78b170a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -268,7 +268,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         return splits;
     }
 
-    private List<ChunkRange> efficientShardingThroughSampling(
+    public static List<ChunkRange> efficientShardingThroughSampling(
             TablePath tablePath, Object[] sampleData, long approximateRowCnt, 
int shardCount) {
         log.info(
                 "Use efficient sharding through sampling optimization for 
table {}, the approximate row count is {}, the shardCount is {}",
@@ -285,22 +285,32 @@ public class DynamicChunkSplitter extends ChunkSplitter {
 
         double approxSamplePerShard = (double) sampleData.length / shardCount;
 
+        Object lastEnd = null;
         if (approxSamplePerShard <= 1) {
-
             splits.add(ChunkRange.of(null, sampleData[0]));
-            for (int i = 0; i < sampleData.length - 1; i++) {
-                splits.add(ChunkRange.of(sampleData[i], sampleData[i + 1]));
+            lastEnd = sampleData[0];
+            for (int i = 1; i < sampleData.length; i++) {
+                // avoid split duplicate data
+                if (!sampleData[i].equals(lastEnd)) {
+                    splits.add(ChunkRange.of(lastEnd, sampleData[i]));
+                    lastEnd = sampleData[i];
+                }
             }
-            splits.add(ChunkRange.of(sampleData[sampleData.length - 1], null));
+
+            splits.add(ChunkRange.of(lastEnd, null));
+
         } else {
-            // Calculate the shard boundaries
             for (int i = 0; i < shardCount; i++) {
-                Object chunkStart = i == 0 ? null : sampleData[(int) (i * 
approxSamplePerShard)];
+                Object chunkStart = lastEnd;
                 Object chunkEnd =
-                        i < shardCount - 1
+                        (i < shardCount - 1)
                                 ? sampleData[(int) ((i + 1) * 
approxSamplePerShard)]
                                 : null;
-                splits.add(ChunkRange.of(chunkStart, chunkEnd));
+                // avoid split duplicate data
+                if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, 
chunkStart)) {
+                    splits.add(ChunkRange.of(chunkStart, chunkEnd));
+                    lastEnd = chunkEnd;
+                }
             }
         }
         return splits;
@@ -530,7 +540,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
 
     @Data
     @EqualsAndHashCode
-    private static class ChunkRange implements Serializable {
+    public static class ChunkRange implements Serializable {
         private final Object chunkStart;
         private final Object chunkEnd;
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
new file mode 100644
index 0000000000..8896e5f960
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DynamicChunkSplitterTest {
+
+    @Test
+    public void testEfficientShardingThroughSampling() throws 
NoSuchMethodException {
+        TablePath tablePath = new TablePath("db", "xe", "table");
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 2),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 1),
+                Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 10),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 
2}, 1000, 10),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, 2),
+                        DynamicChunkSplitter.ChunkRange.of(2, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 
2}, 1000, 1),
+                Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 
2}, 1000, 2),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1}, 1000, 1),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1}, 1000, 2),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3}, 1000, 2),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 2),
+                        DynamicChunkSplitter.ChunkRange.of(2, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3}, 1000, 1),
+                Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3}, 1000, 3),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, 2),
+                        DynamicChunkSplitter.ChunkRange.of(2, 3),
+                        DynamicChunkSplitter.ChunkRange.of(3, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3, 4, 5}, 1000, 3),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 2),
+                        DynamicChunkSplitter.ChunkRange.of(2, 4),
+                        DynamicChunkSplitter.ChunkRange.of(4, null)));
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3, 4, 5}, 1000, 2),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 3),
+                        DynamicChunkSplitter.ChunkRange.of(3, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 1),
+                Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 3),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 3),
+                        DynamicChunkSplitter.ChunkRange.of(3, 5),
+                        DynamicChunkSplitter.ChunkRange.of(5, null)));
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 4),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 2),
+                        DynamicChunkSplitter.ChunkRange.of(2, 4),
+                        DynamicChunkSplitter.ChunkRange.of(4, 5),
+                        DynamicChunkSplitter.ChunkRange.of(5, null)));
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 5),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 2),
+                        DynamicChunkSplitter.ChunkRange.of(2, 3),
+                        DynamicChunkSplitter.ChunkRange.of(3, 4),
+                        DynamicChunkSplitter.ChunkRange.of(4, 5),
+                        DynamicChunkSplitter.ChunkRange.of(5, null)));
+        check(
+                DynamicChunkSplitter.efficientShardingThroughSampling(
+                        tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 6),
+                Arrays.asList(
+                        DynamicChunkSplitter.ChunkRange.of(null, 1),
+                        DynamicChunkSplitter.ChunkRange.of(1, 2),
+                        DynamicChunkSplitter.ChunkRange.of(2, 3),
+                        DynamicChunkSplitter.ChunkRange.of(3, 4),
+                        DynamicChunkSplitter.ChunkRange.of(4, 5),
+                        DynamicChunkSplitter.ChunkRange.of(5, 6),
+                        DynamicChunkSplitter.ChunkRange.of(6, null)));
+    }
+
+    private void check(
+            List<DynamicChunkSplitter.ChunkRange> a, 
List<DynamicChunkSplitter.ChunkRange> b) {
+        checkRule(b);
+        assertEquals(a, b);
+    }
+
+    private void checkRule(List<DynamicChunkSplitter.ChunkRange> a) {
+        for (int i = 0; i < a.size(); i++) {
+            if (i == 0) {
+                assertNull(a.get(i).getChunkStart());
+            }
+            if (i == a.size() - 1) {
+                assertNull(a.get(i).getChunkEnd());
+            }
+            // current chunk start should be equal to previous chunk end
+            if (i > 0) {
+                assertEquals(a.get(i - 1).getChunkEnd(), 
a.get(i).getChunkStart());
+            }
+            if (i > 0 && i < a.size() - 1) {
+                // current chunk end should be greater than current chunk start
+                assertTrue((int) a.get(i).getChunkEnd() > (int) 
a.get(i).getChunkStart());
+            }
+        }
+    }
+}

Reply via email to