This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 635c24e8b2 [Bugfix][JDBC、CDC] Fix Spliter Error in Case of Extensive
Duplicate Data (#6026)
635c24e8b2 is described below
commit 635c24e8b28d22602ba29d9117d63c2fbdf4af2e
Author: ic4y <[email protected]>
AuthorDate: Thu Jan 18 13:40:18 2024 +0800
[Bugfix][JDBC、CDC] Fix Spliter Error in Case of Extensive Duplicate Data
(#6026)
---
.../splitter/AbstractJdbcSourceChunkSplitter.java | 26 ++-
.../AbstractJdbcSourceChunkSplitterTest.java | 232 +++++++++++++++++++++
.../jdbc/source/DynamicChunkSplitter.java | 30 ++-
.../jdbc/source/DynamicChunkSplitterTest.java | 191 +++++++++++++++++
4 files changed, 461 insertions(+), 18 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index 151e003ecd..f124e5fc71 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -262,22 +262,32 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
double approxSamplePerShard = (double) sampleData.length / shardCount;
+ Object lastEnd = null;
if (approxSamplePerShard <= 1) {
-
splits.add(ChunkRange.of(null, sampleData[0]));
- for (int i = 0; i < sampleData.length - 1; i++) {
- splits.add(ChunkRange.of(sampleData[i], sampleData[i + 1]));
+ lastEnd = sampleData[0];
+ for (int i = 1; i < sampleData.length; i++) {
+ // avoid split duplicate data
+ if (!sampleData[i].equals(lastEnd)) {
+ splits.add(ChunkRange.of(lastEnd, sampleData[i]));
+ lastEnd = sampleData[i];
+ }
}
- splits.add(ChunkRange.of(sampleData[sampleData.length - 1], null));
+
+ splits.add(ChunkRange.of(lastEnd, null));
+
} else {
- // Calculate the shard boundaries
for (int i = 0; i < shardCount; i++) {
- Object chunkStart = i == 0 ? null : sampleData[(int) (i *
approxSamplePerShard)];
+ Object chunkStart = lastEnd;
Object chunkEnd =
- i < shardCount - 1
+ (i < shardCount - 1)
? sampleData[(int) ((i + 1) *
approxSamplePerShard)]
: null;
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ // avoid split duplicate data
+ if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd,
chunkStart)) {
+ splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ lastEnd = chunkEnd;
+ }
}
}
return splits;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
new file mode 100644
index 0000000000..076bafae15
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbstractJdbcSourceChunkSplitterTest {
+
+ @Test
+ public void testEfficientShardingThroughSampling() throws
NoSuchMethodException {
+
+ UtJdbcSourceChunkSplitter utJdbcSourceChunkSplitter = new
UtJdbcSourceChunkSplitter();
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 2),
+ Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 1),
+ Arrays.asList(ChunkRange.of(null, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 10),
+ Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2},
1000, 10),
+ Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, 2),
ChunkRange.of(2, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2},
1000, 1),
+ Arrays.asList(ChunkRange.of(null, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2},
1000, 2),
+ Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1}, 1000, 1),
+ Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1}, 1000, 2),
+ Arrays.asList(ChunkRange.of(null, 1), ChunkRange.of(1, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3}, 1000, 2),
+ Arrays.asList(ChunkRange.of(null, 2), ChunkRange.of(2, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3}, 1000, 1),
+ Arrays.asList(ChunkRange.of(null, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3}, 1000, 3),
+ Arrays.asList(
+ ChunkRange.of(null, 1),
+ ChunkRange.of(1, 2),
+ ChunkRange.of(2, 3),
+ ChunkRange.of(3, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3, 4, 5}, 1000, 3),
+ Arrays.asList(ChunkRange.of(null, 2), ChunkRange.of(2, 4),
ChunkRange.of(4, null)));
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3, 4, 5}, 1000, 2),
+ Arrays.asList(ChunkRange.of(null, 3), ChunkRange.of(3, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 1),
+ Arrays.asList(ChunkRange.of(null, null)));
+
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 3),
+ Arrays.asList(ChunkRange.of(null, 3), ChunkRange.of(3, 5),
ChunkRange.of(5, null)));
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 4),
+ Arrays.asList(
+ ChunkRange.of(null, 2),
+ ChunkRange.of(2, 4),
+ ChunkRange.of(4, 5),
+ ChunkRange.of(5, null)));
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 5),
+ Arrays.asList(
+ ChunkRange.of(null, 2),
+ ChunkRange.of(2, 3),
+ ChunkRange.of(3, 4),
+ ChunkRange.of(4, 5),
+ ChunkRange.of(5, null)));
+ check(
+ utJdbcSourceChunkSplitter.efficientShardingThroughSampling(
+ null, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 6),
+ Arrays.asList(
+ ChunkRange.of(null, 1),
+ ChunkRange.of(1, 2),
+ ChunkRange.of(2, 3),
+ ChunkRange.of(3, 4),
+ ChunkRange.of(4, 5),
+ ChunkRange.of(5, 6),
+ ChunkRange.of(6, null)));
+ }
+
+ private void check(List<ChunkRange> a, List<ChunkRange> b) {
+ checkRule(b);
+ assertEquals(a, b);
+ }
+
+ private void checkRule(List<ChunkRange> a) {
+ for (int i = 0; i < a.size(); i++) {
+ if (i == 0) {
+ assertNull(a.get(i).getChunkStart());
+ }
+ if (i == a.size() - 1) {
+ assertNull(a.get(i).getChunkEnd());
+ }
+ // current chunk start should be equal to previous chunk end
+ if (i > 0) {
+ assertEquals(a.get(i - 1).getChunkEnd(),
a.get(i).getChunkStart());
+ }
+ if (i > 0 && i < a.size() - 1) {
+ // current chunk end should be greater than current chunk start
+ assertTrue((int) a.get(i).getChunkEnd() > (int)
a.get(i).getChunkStart());
+ }
+ }
+ }
+
+ public static class UtJdbcSourceChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
+
+ public UtJdbcSourceChunkSplitter() {
+ super(null, null);
+ }
+
+ @Override
+ public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
+ throws SQLException {
+ return new Object[0];
+ }
+
+ @Override
+ public Object queryMin(
+ JdbcConnection jdbc, TableId tableId, String columnName,
Object excludedLowerBound)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object[] sampleDataFromColumn(
+ JdbcConnection jdbc, TableId tableId, String columnName, int
samplingRate)
+ throws SQLException {
+ return new Object[0];
+ }
+
+ @Override
+ public Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public String buildSplitScanQuery(
+ TableId tableId,
+ SeaTunnelRowType splitKeyType,
+ boolean isFirstSplit,
+ boolean isLastSplit) {
+ return null;
+ }
+
+ @Override
+ public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
+ return null;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index 62cc173702..12f78b170a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -268,7 +268,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
return splits;
}
- private List<ChunkRange> efficientShardingThroughSampling(
+ public static List<ChunkRange> efficientShardingThroughSampling(
TablePath tablePath, Object[] sampleData, long approximateRowCnt,
int shardCount) {
log.info(
"Use efficient sharding through sampling optimization for
table {}, the approximate row count is {}, the shardCount is {}",
@@ -285,22 +285,32 @@ public class DynamicChunkSplitter extends ChunkSplitter {
double approxSamplePerShard = (double) sampleData.length / shardCount;
+ Object lastEnd = null;
if (approxSamplePerShard <= 1) {
-
splits.add(ChunkRange.of(null, sampleData[0]));
- for (int i = 0; i < sampleData.length - 1; i++) {
- splits.add(ChunkRange.of(sampleData[i], sampleData[i + 1]));
+ lastEnd = sampleData[0];
+ for (int i = 1; i < sampleData.length; i++) {
+ // avoid split duplicate data
+ if (!sampleData[i].equals(lastEnd)) {
+ splits.add(ChunkRange.of(lastEnd, sampleData[i]));
+ lastEnd = sampleData[i];
+ }
}
- splits.add(ChunkRange.of(sampleData[sampleData.length - 1], null));
+
+ splits.add(ChunkRange.of(lastEnd, null));
+
} else {
- // Calculate the shard boundaries
for (int i = 0; i < shardCount; i++) {
- Object chunkStart = i == 0 ? null : sampleData[(int) (i *
approxSamplePerShard)];
+ Object chunkStart = lastEnd;
Object chunkEnd =
- i < shardCount - 1
+ (i < shardCount - 1)
? sampleData[(int) ((i + 1) *
approxSamplePerShard)]
: null;
- splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ // avoid split duplicate data
+ if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd,
chunkStart)) {
+ splits.add(ChunkRange.of(chunkStart, chunkEnd));
+ lastEnd = chunkEnd;
+ }
}
}
return splits;
@@ -530,7 +540,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
@Data
@EqualsAndHashCode
- private static class ChunkRange implements Serializable {
+ public static class ChunkRange implements Serializable {
private final Object chunkStart;
private final Object chunkEnd;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
new file mode 100644
index 0000000000..8896e5f960
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DynamicChunkSplitterTest {
+
+ @Test
+ public void testEfficientShardingThroughSampling() throws
NoSuchMethodException {
+ TablePath tablePath = new TablePath("db", "xe", "table");
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 2),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 1),
+ Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 1, 1, 1, 1, 1}, 1000, 10),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2,
2}, 1000, 10),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, 2),
+ DynamicChunkSplitter.ChunkRange.of(2, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2,
2}, 1000, 1),
+ Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 1, 1, 1, 1, 1, 2, 2, 2, 2,
2}, 1000, 2),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1}, 1000, 1),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1}, 1000, 2),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3}, 1000, 2),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 2),
+ DynamicChunkSplitter.ChunkRange.of(2, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3}, 1000, 1),
+ Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3}, 1000, 3),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, 2),
+ DynamicChunkSplitter.ChunkRange.of(2, 3),
+ DynamicChunkSplitter.ChunkRange.of(3, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3, 4, 5}, 1000, 3),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 2),
+ DynamicChunkSplitter.ChunkRange.of(2, 4),
+ DynamicChunkSplitter.ChunkRange.of(4, null)));
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3, 4, 5}, 1000, 2),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 3),
+ DynamicChunkSplitter.ChunkRange.of(3, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 1),
+ Arrays.asList(DynamicChunkSplitter.ChunkRange.of(null, null)));
+
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 3),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 3),
+ DynamicChunkSplitter.ChunkRange.of(3, 5),
+ DynamicChunkSplitter.ChunkRange.of(5, null)));
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 4),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 2),
+ DynamicChunkSplitter.ChunkRange.of(2, 4),
+ DynamicChunkSplitter.ChunkRange.of(4, 5),
+ DynamicChunkSplitter.ChunkRange.of(5, null)));
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 5),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 2),
+ DynamicChunkSplitter.ChunkRange.of(2, 3),
+ DynamicChunkSplitter.ChunkRange.of(3, 4),
+ DynamicChunkSplitter.ChunkRange.of(4, 5),
+ DynamicChunkSplitter.ChunkRange.of(5, null)));
+ check(
+ DynamicChunkSplitter.efficientShardingThroughSampling(
+ tablePath, new Object[] {1, 2, 3, 4, 5, 6}, 1000, 6),
+ Arrays.asList(
+ DynamicChunkSplitter.ChunkRange.of(null, 1),
+ DynamicChunkSplitter.ChunkRange.of(1, 2),
+ DynamicChunkSplitter.ChunkRange.of(2, 3),
+ DynamicChunkSplitter.ChunkRange.of(3, 4),
+ DynamicChunkSplitter.ChunkRange.of(4, 5),
+ DynamicChunkSplitter.ChunkRange.of(5, 6),
+ DynamicChunkSplitter.ChunkRange.of(6, null)));
+ }
+
+ private void check(
+ List<DynamicChunkSplitter.ChunkRange> a,
List<DynamicChunkSplitter.ChunkRange> b) {
+ checkRule(b);
+ assertEquals(a, b);
+ }
+
+ private void checkRule(List<DynamicChunkSplitter.ChunkRange> a) {
+ for (int i = 0; i < a.size(); i++) {
+ if (i == 0) {
+ assertNull(a.get(i).getChunkStart());
+ }
+ if (i == a.size() - 1) {
+ assertNull(a.get(i).getChunkEnd());
+ }
+ // current chunk start should be equal to previous chunk end
+ if (i > 0) {
+ assertEquals(a.get(i - 1).getChunkEnd(),
a.get(i).getChunkStart());
+ }
+ if (i > 0 && i < a.size() - 1) {
+ // current chunk end should be greater than current chunk start
+ assertTrue((int) a.get(i).getChunkEnd() > (int)
a.get(i).getChunkStart());
+ }
+ }
+ }
+}