This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6e7113a529 Flink: a few small fixes or tuning for range partitioner
(#10823)
6e7113a529 is described below
commit 6e7113a5291dffad38ffacc7d264456a2366a707
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Thu Aug 1 14:10:37 2024 -0700
Flink: a few small fixes or tuning for range partitioner (#10823)
---
.../sink/shuffle/AggregatedStatisticsTracker.java | 23 ++++--
.../flink/sink/shuffle/CompletedStatistics.java | 8 ++
.../sink/shuffle/DataStatisticsCoordinator.java | 45 ++++++-----
.../flink/sink/shuffle/RangePartitioner.java | 8 +-
.../flink/sink/shuffle/SketchRangePartitioner.java | 19 +----
.../iceberg/flink/sink/shuffle/SketchUtil.java | 17 +++++
.../iceberg/flink/sink/shuffle/SortKeyUtil.java | 59 +++++++++++++++
.../flink/sink/shuffle/TestRangePartitioner.java | 65 ++++++++++++++++
.../sink/shuffle/TestSketchRangePartitioner.java | 88 ++++++++++++++++++++++
.../iceberg/flink/sink/shuffle/TestSketchUtil.java | 64 +++++++++++++++-
.../flink/sink/shuffle/TestSortKeyUtil.java | 73 ++++++++++++++++++
11 files changed, 420 insertions(+), 49 deletions(-)
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
index 338523b7b0..5525f02c87 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
@@ -223,7 +223,9 @@ class AggregatedStatisticsTracker {
convertCoordinatorToSketch();
}
- sketchStatistics.update(taskSketch);
+ if (taskSketch.getNumSamples() > 0) {
+ sketchStatistics.update(taskSketch);
+ }
}
}
@@ -242,13 +244,18 @@ class AggregatedStatisticsTracker {
return CompletedStatistics.fromKeyFrequency(checkpointId,
mapStatistics);
} else {
ReservoirItemsSketch<SortKey> sketch = sketchStatistics.getResult();
- LOG.info(
- "Completed sketch statistics aggregation: "
- + "reservoir size = {}, number of items seen = {}, number of
samples = {}",
- sketch.getK(),
- sketch.getN(),
- sketch.getNumSamples());
- return CompletedStatistics.fromKeySamples(checkpointId,
sketch.getSamples());
+ if (sketch != null) {
+ LOG.info(
+ "Completed sketch statistics aggregation: "
+ + "reservoir size = {}, number of items seen = {}, number of
samples = {}",
+ sketch.getK(),
+ sketch.getN(),
+ sketch.getNumSamples());
+ return CompletedStatistics.fromKeySamples(checkpointId,
sketch.getSamples());
+ } else {
+ LOG.info("Empty sketch statistics.");
+ return CompletedStatistics.fromKeySamples(checkpointId, new
SortKey[0]);
+ }
}
}
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
index c0e228965d..e4cba174f0 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
@@ -100,4 +100,12 @@ class CompletedStatistics {
SortKey[] keySamples() {
return keySamples;
}
+
+ boolean isEmpty() {
+ if (type == StatisticsType.Sketch) {
+ return keySamples.length == 0;
+ } else {
+ return keyFrequency().isEmpty();
+ }
+ }
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
index 3b21fbae31..4bfde7204a 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
@@ -38,11 +38,11 @@ import org.apache.flink.util.ThrowableCatchingRunnable;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -91,7 +91,7 @@ class DataStatisticsCoordinator implements
OperatorCoordinator {
this.context = context;
this.schema = schema;
this.sortOrder = sortOrder;
- this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+ this.comparator = Comparators.forType(SortKeyUtil.sortKeySchema(schema,
sortOrder).asStruct());
this.downstreamParallelism = downstreamParallelism;
this.statisticsType = statisticsType;
this.closeFileCostWeightPercentage = closeFileCostWeightPercentage;
@@ -202,17 +202,23 @@ class DataStatisticsCoordinator implements
OperatorCoordinator {
aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event);
if (maybeCompletedStatistics != null) {
- // completedStatistics contains the complete samples, which is needed to
compute
- // the range bounds in globalStatistics if downstreamParallelism changed.
- this.completedStatistics = maybeCompletedStatistics;
- // globalStatistics only contains assignment calculated based on Map or
Sketch statistics
- this.globalStatistics =
- globalStatistics(
- maybeCompletedStatistics,
- downstreamParallelism,
- comparator,
- closeFileCostWeightPercentage);
- sendGlobalStatisticsToSubtasks(globalStatistics);
+ if (maybeCompletedStatistics.isEmpty()) {
+ LOG.info(
+ "Skip aggregated statistics for checkpoint {} as it is empty.",
event.checkpointId());
+ } else {
+ LOG.info("Completed statistics aggregation for checkpoint {}",
event.checkpointId());
+ // completedStatistics contains the complete samples, which is needed
to compute
+ // the range bounds in globalStatistics if downstreamParallelism
changed.
+ this.completedStatistics = maybeCompletedStatistics;
+ // globalStatistics only contains assignment calculated based on Map
or Sketch statistics
+ this.globalStatistics =
+ globalStatistics(
+ maybeCompletedStatistics,
+ downstreamParallelism,
+ comparator,
+ closeFileCostWeightPercentage);
+ sendGlobalStatisticsToSubtasks(globalStatistics);
+ }
}
}
@@ -324,9 +330,14 @@ class DataStatisticsCoordinator implements
OperatorCoordinator {
"Snapshotting data statistics coordinator {} for checkpoint {}",
operatorName,
checkpointId);
- resultFuture.complete(
- StatisticsUtil.serializeCompletedStatistics(
- completedStatistics, completedStatisticsSerializer));
+ if (completedStatistics == null) {
+ // null checkpoint result is not allowed, hence supply an empty
byte array
+ resultFuture.complete(new byte[0]);
+ } else {
+ resultFuture.complete(
+ StatisticsUtil.serializeCompletedStatistics(
+ completedStatistics, completedStatisticsSerializer));
+ }
},
String.format("taking checkpoint %d", checkpointId));
}
@@ -338,7 +349,7 @@ class DataStatisticsCoordinator implements
OperatorCoordinator {
public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
Preconditions.checkState(
!started, "The coordinator %s can only be reset if it was not yet
started", operatorName);
- if (checkpointData == null) {
+ if (checkpointData == null || checkpointData.length == 0) {
LOG.info(
"Data statistic coordinator {} has nothing to restore from
checkpoint {}",
operatorName,
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
index 83a9461233..6608b938f5 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
@@ -23,12 +23,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** The wrapper class */
+/** This custom partitioner implements the {@link DistributionMode#RANGE} for
Flink sink. */
@Internal
public class RangePartitioner implements Partitioner<StatisticsOrRecord> {
private static final Logger LOG =
LoggerFactory.getLogger(RangePartitioner.class);
@@ -94,9 +95,8 @@ public class RangePartitioner implements
Partitioner<StatisticsOrRecord> {
if (numPartitionsStatsCalculation <= numPartitions) {
// no rescale or scale-up case.
// new subtasks are ignored and not assigned any keys, which is
sub-optimal and only
- // transient.
- // when rescale is detected, operator requests new statistics from
coordinator upon
- // initialization.
+ // transient. when rescale is detected, operator requests new statistics
from
+ // coordinator upon initialization.
return partition;
} else {
// scale-down case.
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
index af78271ea5..dddb0d8722 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
@@ -18,17 +18,16 @@
*/
package org.apache.iceberg.flink.sink.shuffle;
-import java.util.Arrays;
import java.util.Comparator;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.types.Comparators;
class SketchRangePartitioner implements Partitioner<RowData> {
private final SortKey sortKey;
@@ -38,7 +37,7 @@ class SketchRangePartitioner implements Partitioner<RowData> {
SketchRangePartitioner(Schema schema, SortOrder sortOrder, SortKey[]
rangeBounds) {
this.sortKey = new SortKey(schema, sortOrder);
- this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+ this.comparator = Comparators.forType(SortKeyUtil.sortKeySchema(schema,
sortOrder).asStruct());
this.rangeBounds = rangeBounds;
this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema),
schema.asStruct());
}
@@ -47,18 +46,6 @@ class SketchRangePartitioner implements Partitioner<RowData>
{
public int partition(RowData row, int numPartitions) {
// reuse the sortKey and rowDataWrapper
sortKey.wrap(rowDataWrapper.wrap(row));
- int partition = Arrays.binarySearch(rangeBounds, sortKey, comparator);
-
- // binarySearch either returns the match location or -[insertion point]-1
- if (partition < 0) {
- partition = -partition - 1;
- }
-
- if (partition > rangeBounds.length) {
- partition = rangeBounds.length;
- }
-
- return RangePartitioner.adjustPartitionWithRescale(
- partition, rangeBounds.length + 1, numPartitions);
+ return SketchUtil.partition(sortKey, numPartitions, rangeBounds,
comparator);
}
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
index a58310611e..871ef9ef11 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
@@ -139,4 +139,21 @@ class SketchUtil {
}
});
}
+
+ static int partition(
+ SortKey key, int numPartitions, SortKey[] rangeBounds,
Comparator<StructLike> comparator) {
+ int partition = Arrays.binarySearch(rangeBounds, key, comparator);
+
+ // binarySearch either returns the match location or -[insertion point]-1
+ if (partition < 0) {
+ partition = -partition - 1;
+ }
+
+ if (partition > rangeBounds.length) {
+ partition = rangeBounds.length;
+ }
+
+ return RangePartitioner.adjustPartitionWithRescale(
+ partition, rangeBounds.length + 1, numPartitions);
+ }
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeyUtil.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeyUtil.java
new file mode 100644
index 0000000000..1e5bdbbac3
--- /dev/null
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeyUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class SortKeyUtil {
+ private SortKeyUtil() {}
+
+ /** Compute the result schema of {@code SortKey} transformation */
+ static Schema sortKeySchema(Schema schema, SortOrder sortOrder) {
+ List<SortField> sortFields = sortOrder.fields();
+ int size = sortFields.size();
+ List<Types.NestedField> transformedFields =
Lists.newArrayListWithCapacity(size);
+ for (int i = 0; i < size; ++i) {
+ int sourceFieldId = sortFields.get(i).sourceId();
+ Types.NestedField sourceField = schema.findField(sourceFieldId);
+ Preconditions.checkArgument(
+ sourceField != null, "Cannot find source field: %s", sourceFieldId);
+ Type transformedType =
sortFields.get(i).transform().getResultType(sourceField.type());
+ // There could be multiple transformations on the same source column,
like in the PartitionKey
+ // case. To resolve the collision, field id is set to transform index
and field name is set to
+ // sourceFieldName_transformIndex
+ Types.NestedField transformedField =
+ Types.NestedField.of(
+ i,
+ sourceField.isOptional(),
+ sourceField.name() + '_' + i,
+ transformedType,
+ sourceField.doc());
+ transformedFields.add(transformedField);
+ }
+
+ return new Schema(transformedFields);
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
new file mode 100644
index 0000000000..0485fdb7fa
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.SCHEMA;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.SORT_ORDER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.jupiter.api.Test;
+
+public class TestRangePartitioner {
+ private final int numPartitions = 4;
+
+ @Test
+ public void testRoundRobinRecordsBeforeStatisticsAvailable() {
+ RangePartitioner partitioner = new RangePartitioner(SCHEMA, SORT_ORDER);
+ Set<Integer> results = Sets.newHashSetWithExpectedSize(numPartitions);
+ for (int i = 0; i < numPartitions; ++i) {
+ results.add(
+ partitioner.partition(
+
StatisticsOrRecord.fromRecord(GenericRowData.of(StringData.fromString("a"), 1)),
+ numPartitions));
+ }
+
+ // round-robin. every partition should get an assignment
+ assertThat(results).containsExactlyInAnyOrder(0, 1, 2, 3);
+ }
+
+ @Test
+ public void testRoundRobinStatisticsWrapper() {
+ RangePartitioner partitioner = new RangePartitioner(SCHEMA, SORT_ORDER);
+ Set<Integer> results = Sets.newHashSetWithExpectedSize(numPartitions);
+ for (int i = 0; i < numPartitions; ++i) {
+ GlobalStatistics statistics =
+ GlobalStatistics.fromRangeBounds(1L, new SortKey[]
{CHAR_KEYS.get("a")});
+ results.add(
+ partitioner.partition(StatisticsOrRecord.fromStatistics(statistics),
numPartitions));
+ }
+
+ // round-robin. every partition should get an assignment
+ assertThat(results).containsExactlyInAnyOrder(0, 1, 2, 3);
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchRangePartitioner.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchRangePartitioner.java
new file mode 100644
index 0000000000..378c6afff0
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchRangePartitioner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TestFixtures;
+import org.junit.jupiter.api.Test;
+
+public class TestSketchRangePartitioner {
+ // sort on the long id field
+ private static final SortOrder SORT_ORDER =
+ SortOrder.builderFor(TestFixtures.SCHEMA).asc("id").build();
+ private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA,
SORT_ORDER);
+ private static final RowType ROW_TYPE =
FlinkSchemaUtil.convert(TestFixtures.SCHEMA);
+ private static final int NUM_PARTITIONS = 16;
+ private static final long RANGE_STEP = 1_000;
+ private static final long MAX_ID = RANGE_STEP * NUM_PARTITIONS;
+ private static final SortKey[] RANGE_BOUNDS = createRangeBounds();
+
+ /**
+ * To understand how range bounds are used in range partitioning, here is an
example for human
+ * ages with 4 partitions: [15, 32, 60]. The 4 ranges would be
+ *
+ * <ul>
+ * <li>age <= 15
+ * <li>age > 15 && age <= 32
+ * <li>age >32 && age <= 60
+ * <li>age > 60
+ * </ul>
+ */
+ private static SortKey[] createRangeBounds() {
+ SortKey[] rangeBounds = new SortKey[NUM_PARTITIONS - 1];
+ for (int i = 0; i < NUM_PARTITIONS - 1; ++i) {
+ RowData rowData =
+ GenericRowData.of(
+ StringData.fromString("data"),
+ RANGE_STEP * (i + 1),
+ StringData.fromString("2023-06-20"));
+ RowDataWrapper keyWrapper = new RowDataWrapper(ROW_TYPE,
TestFixtures.SCHEMA.asStruct());
+ keyWrapper.wrap(rowData);
+ SortKey sortKey = new SortKey(TestFixtures.SCHEMA, SORT_ORDER);
+ sortKey.wrap(keyWrapper);
+ rangeBounds[i] = sortKey;
+ }
+
+ return rangeBounds;
+ }
+
+ @Test
+ public void testRangePartitioningWithRangeBounds() {
+ SketchRangePartitioner partitioner =
+ new SketchRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER,
RANGE_BOUNDS);
+ GenericRowData row =
+ GenericRowData.of(StringData.fromString("data"), 0L,
StringData.fromString("2023-06-20"));
+ for (long id = 0; id < MAX_ID; ++id) {
+ row.setField(1, id);
+ int partition = partitioner.partition(row, NUM_PARTITIONS);
+
assertThat(partition).isGreaterThanOrEqualTo(0).isLessThan(NUM_PARTITIONS);
+ int expectedPartition = id == 0L ? 0 : (int) ((id - 1) / RANGE_STEP);
+ assertThat(partition).isEqualTo(expectedPartition);
+ }
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
index 31dae5c76a..16202c075e 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
@@ -19,10 +19,13 @@
package org.apache.iceberg.flink.sink.shuffle;
import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS;
+import static
org.apache.iceberg.flink.sink.shuffle.Fixtures.SORT_ORDER_COMPARTOR;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.iceberg.SortKey;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
public class TestSketchUtil {
@Test
@@ -55,7 +58,7 @@ public class TestSketchUtil {
assertThat(
SketchUtil.rangeBounds(
1,
- Fixtures.SORT_ORDER_COMPARTOR,
+ SORT_ORDER_COMPARTOR,
new SortKey[] {
CHAR_KEYS.get("a"),
CHAR_KEYS.get("b"),
@@ -72,7 +75,7 @@ public class TestSketchUtil {
assertThat(
SketchUtil.rangeBounds(
3,
- Fixtures.SORT_ORDER_COMPARTOR,
+ SORT_ORDER_COMPARTOR,
new SortKey[] {
CHAR_KEYS.get("a"),
CHAR_KEYS.get("b"),
@@ -90,7 +93,7 @@ public class TestSketchUtil {
assertThat(
SketchUtil.rangeBounds(
4,
- Fixtures.SORT_ORDER_COMPARTOR,
+ SORT_ORDER_COMPARTOR,
new SortKey[] {
CHAR_KEYS.get("a"),
CHAR_KEYS.get("b"),
@@ -113,7 +116,7 @@ public class TestSketchUtil {
assertThat(
SketchUtil.rangeBounds(
4,
- Fixtures.SORT_ORDER_COMPARTOR,
+ SORT_ORDER_COMPARTOR,
new SortKey[] {
CHAR_KEYS.get("a"),
CHAR_KEYS.get("b"),
@@ -130,4 +133,57 @@ public class TestSketchUtil {
// skipped duplicate c's
.containsExactly(CHAR_KEYS.get("c"), CHAR_KEYS.get("g"),
CHAR_KEYS.get("j"));
}
+
+ @ParameterizedTest
+ @ValueSource(ints = {4, 6})
+ public void testPartitioningAndScaleUp(int numPartitions) {
+ // Range bounds are calculated based on 4 partitions
+ SortKey[] rangeBounds =
+ new SortKey[] {CHAR_KEYS.get("c"), CHAR_KEYS.get("j"),
CHAR_KEYS.get("m")};
+
+ // <= c
+ assertPartition(0, CHAR_KEYS.get("a"), numPartitions, rangeBounds);
+ assertPartition(0, CHAR_KEYS.get("c"), numPartitions, rangeBounds);
+ // > c && <= j
+ assertPartition(1, CHAR_KEYS.get("d"), numPartitions, rangeBounds);
+ assertPartition(1, CHAR_KEYS.get("i"), numPartitions, rangeBounds);
+ assertPartition(1, CHAR_KEYS.get("j"), numPartitions, rangeBounds);
+ // > j && <= m
+ assertPartition(2, CHAR_KEYS.get("k"), numPartitions, rangeBounds);
+ assertPartition(2, CHAR_KEYS.get("l"), numPartitions, rangeBounds);
+ assertPartition(2, CHAR_KEYS.get("m"), numPartitions, rangeBounds);
+ // > m
+ assertPartition(3, CHAR_KEYS.get("n"), numPartitions, rangeBounds);
+ assertPartition(3, CHAR_KEYS.get("z"), numPartitions, rangeBounds);
+ }
+
+ @Test
+ public void testPartitionScaleDown() {
+ // Range bounds are calculated based on 4 partitions
+ SortKey[] rangeBounds =
+ new SortKey[] {CHAR_KEYS.get("c"), CHAR_KEYS.get("j"),
CHAR_KEYS.get("m")};
+ int numPartitions = 3;
+
+ // <= c
+ assertPartition(0, CHAR_KEYS.get("a"), numPartitions, rangeBounds);
+ assertPartition(0, CHAR_KEYS.get("c"), numPartitions, rangeBounds);
+ // > c && <= j
+ assertPartition(1, CHAR_KEYS.get("d"), numPartitions, rangeBounds);
+ assertPartition(1, CHAR_KEYS.get("i"), numPartitions, rangeBounds);
+ assertPartition(1, CHAR_KEYS.get("j"), numPartitions, rangeBounds);
+ // > j && <= m
+ assertPartition(2, CHAR_KEYS.get("k"), numPartitions, rangeBounds);
+ assertPartition(2, CHAR_KEYS.get("l"), numPartitions, rangeBounds);
+ assertPartition(2, CHAR_KEYS.get("m"), numPartitions, rangeBounds);
+ // > m
+ // reassigns out-of-range partitions via mod (% 3 in this case)
+ assertPartition(0, CHAR_KEYS.get("n"), numPartitions, rangeBounds);
+ assertPartition(0, CHAR_KEYS.get("z"), numPartitions, rangeBounds);
+ }
+
+ private static void assertPartition(
+ int expectedPartition, SortKey key, int numPartitions, SortKey[]
rangeBounds) {
+ assertThat(SketchUtil.partition(key, numPartitions, rangeBounds,
SORT_ORDER_COMPARTOR))
+ .isEqualTo(expectedPartition);
+ }
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeyUtil.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeyUtil.java
new file mode 100644
index 0000000000..1be7e27f2c
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeyUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class TestSortKeyUtil {
+ @Test
+ public void testResultSchema() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.required(2, "ratio", Types.DoubleType.get()),
+ Types.NestedField.optional(
+ 3,
+ "user",
+ Types.StructType.of(
+ Types.NestedField.required(11, "name",
Types.StringType.get()),
+ Types.NestedField.required(12, "ts",
Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(13, "device_id",
Types.UUIDType.get()),
+ Types.NestedField.optional(
+ 14,
+ "location",
+ Types.StructType.of(
+ Types.NestedField.required(101, "lat",
Types.FloatType.get()),
+ Types.NestedField.required(102, "long",
Types.FloatType.get()),
+ Types.NestedField.required(103, "blob",
Types.BinaryType.get()))))));
+
+ SortOrder sortOrder =
+ SortOrder.builderFor(schema)
+ .asc("ratio")
+ .sortBy(Expressions.hour("user.ts"), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .sortBy(
+ Expressions.bucket("user.device_id", 16), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .sortBy(
+ Expressions.truncate("user.location.blob", 16),
+ SortDirection.ASC,
+ NullOrder.NULLS_FIRST)
+ .build();
+
+ assertThat(SortKeyUtil.sortKeySchema(schema, sortOrder).asStruct())
+ .isEqualTo(
+ Types.StructType.of(
+ Types.NestedField.required(0, "ratio_0",
Types.DoubleType.get()),
+ Types.NestedField.required(1, "ts_1", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "device_id_2",
Types.IntegerType.get()),
+ Types.NestedField.required(3, "blob_3",
Types.BinaryType.get())));
+ }
+}