This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e7113a529 Flink: a few small fixes or tuning for range partitioner 
(#10823)
6e7113a529 is described below

commit 6e7113a5291dffad38ffacc7d264456a2366a707
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Thu Aug 1 14:10:37 2024 -0700

    Flink: a few small fixes or tuning for range partitioner (#10823)
---
 .../sink/shuffle/AggregatedStatisticsTracker.java  | 23 ++++--
 .../flink/sink/shuffle/CompletedStatistics.java    |  8 ++
 .../sink/shuffle/DataStatisticsCoordinator.java    | 45 ++++++-----
 .../flink/sink/shuffle/RangePartitioner.java       |  8 +-
 .../flink/sink/shuffle/SketchRangePartitioner.java | 19 +----
 .../iceberg/flink/sink/shuffle/SketchUtil.java     | 17 +++++
 .../iceberg/flink/sink/shuffle/SortKeyUtil.java    | 59 +++++++++++++++
 .../flink/sink/shuffle/TestRangePartitioner.java   | 65 ++++++++++++++++
 .../sink/shuffle/TestSketchRangePartitioner.java   | 88 ++++++++++++++++++++++
 .../iceberg/flink/sink/shuffle/TestSketchUtil.java | 64 +++++++++++++++-
 .../flink/sink/shuffle/TestSortKeyUtil.java        | 73 ++++++++++++++++++
 11 files changed, 420 insertions(+), 49 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
index 338523b7b0..5525f02c87 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
@@ -223,7 +223,9 @@ class AggregatedStatisticsTracker {
           convertCoordinatorToSketch();
         }
 
-        sketchStatistics.update(taskSketch);
+        if (taskSketch.getNumSamples() > 0) {
+          sketchStatistics.update(taskSketch);
+        }
       }
     }
 
@@ -242,13 +244,18 @@ class AggregatedStatisticsTracker {
         return CompletedStatistics.fromKeyFrequency(checkpointId, 
mapStatistics);
       } else {
         ReservoirItemsSketch<SortKey> sketch = sketchStatistics.getResult();
-        LOG.info(
-            "Completed sketch statistics aggregation: "
-                + "reservoir size = {}, number of items seen = {}, number of 
samples = {}",
-            sketch.getK(),
-            sketch.getN(),
-            sketch.getNumSamples());
-        return CompletedStatistics.fromKeySamples(checkpointId, 
sketch.getSamples());
+        if (sketch != null) {
+          LOG.info(
+              "Completed sketch statistics aggregation: "
+                  + "reservoir size = {}, number of items seen = {}, number of 
samples = {}",
+              sketch.getK(),
+              sketch.getN(),
+              sketch.getNumSamples());
+          return CompletedStatistics.fromKeySamples(checkpointId, 
sketch.getSamples());
+        } else {
+          LOG.info("Empty sketch statistics.");
+          return CompletedStatistics.fromKeySamples(checkpointId, new 
SortKey[0]);
+        }
       }
     }
   }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
index c0e228965d..e4cba174f0 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
@@ -100,4 +100,12 @@ class CompletedStatistics {
   SortKey[] keySamples() {
     return keySamples;
   }
+
+  boolean isEmpty() {
+    if (type == StatisticsType.Sketch) {
+      return keySamples.length == 0;
+    } else {
+      return keyFrequency().isEmpty();
+    }
+  }
 }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
index 3b21fbae31..4bfde7204a 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
@@ -38,11 +38,11 @@ import org.apache.flink.util.ThrowableCatchingRunnable;
 import org.apache.flink.util.function.ThrowingRunnable;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.SortOrderComparators;
 import org.apache.iceberg.StructLike;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
@@ -91,7 +91,7 @@ class DataStatisticsCoordinator implements 
OperatorCoordinator {
     this.context = context;
     this.schema = schema;
     this.sortOrder = sortOrder;
-    this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+    this.comparator = Comparators.forType(SortKeyUtil.sortKeySchema(schema, 
sortOrder).asStruct());
     this.downstreamParallelism = downstreamParallelism;
     this.statisticsType = statisticsType;
     this.closeFileCostWeightPercentage = closeFileCostWeightPercentage;
@@ -202,17 +202,23 @@ class DataStatisticsCoordinator implements 
OperatorCoordinator {
         aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event);
 
     if (maybeCompletedStatistics != null) {
-      // completedStatistics contains the complete samples, which is needed to 
compute
-      // the range bounds in globalStatistics if downstreamParallelism changed.
-      this.completedStatistics = maybeCompletedStatistics;
-      // globalStatistics only contains assignment calculated based on Map or 
Sketch statistics
-      this.globalStatistics =
-          globalStatistics(
-              maybeCompletedStatistics,
-              downstreamParallelism,
-              comparator,
-              closeFileCostWeightPercentage);
-      sendGlobalStatisticsToSubtasks(globalStatistics);
+      if (maybeCompletedStatistics.isEmpty()) {
+        LOG.info(
+            "Skip aggregated statistics for checkpoint {} as it is empty.", 
event.checkpointId());
+      } else {
+        LOG.info("Completed statistics aggregation for checkpoint {}", 
event.checkpointId());
+        // completedStatistics contains the complete samples, which is needed 
to compute
+        // the range bounds in globalStatistics if downstreamParallelism 
changed.
+        this.completedStatistics = maybeCompletedStatistics;
+        // globalStatistics only contains assignment calculated based on Map 
or Sketch statistics
+        this.globalStatistics =
+            globalStatistics(
+                maybeCompletedStatistics,
+                downstreamParallelism,
+                comparator,
+                closeFileCostWeightPercentage);
+        sendGlobalStatisticsToSubtasks(globalStatistics);
+      }
     }
   }
 
@@ -324,9 +330,14 @@ class DataStatisticsCoordinator implements 
OperatorCoordinator {
               "Snapshotting data statistics coordinator {} for checkpoint {}",
               operatorName,
               checkpointId);
-          resultFuture.complete(
-              StatisticsUtil.serializeCompletedStatistics(
-                  completedStatistics, completedStatisticsSerializer));
+          if (completedStatistics == null) {
+            // null checkpoint result is not allowed, hence supply an empty 
byte array
+            resultFuture.complete(new byte[0]);
+          } else {
+            resultFuture.complete(
+                StatisticsUtil.serializeCompletedStatistics(
+                    completedStatistics, completedStatisticsSerializer));
+          }
         },
         String.format("taking checkpoint %d", checkpointId));
   }
@@ -338,7 +349,7 @@ class DataStatisticsCoordinator implements 
OperatorCoordinator {
   public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
     Preconditions.checkState(
         !started, "The coordinator %s can only be reset if it was not yet 
started", operatorName);
-    if (checkpointData == null) {
+    if (checkpointData == null || checkpointData.length == 0) {
       LOG.info(
           "Data statistic coordinator {} has nothing to restore from 
checkpoint {}",
           operatorName,
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
index 83a9461233..6608b938f5 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/RangePartitioner.java
@@ -23,12 +23,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The wrapper class */
+/** This custom partitioner implements the {@link DistributionMode#RANGE} for 
Flink sink. */
 @Internal
 public class RangePartitioner implements Partitioner<StatisticsOrRecord> {
   private static final Logger LOG = 
LoggerFactory.getLogger(RangePartitioner.class);
@@ -94,9 +95,8 @@ public class RangePartitioner implements 
Partitioner<StatisticsOrRecord> {
     if (numPartitionsStatsCalculation <= numPartitions) {
       // no rescale or scale-up case.
       // new subtasks are ignored and not assigned any keys, which is 
sub-optimal and only
-      // transient.
-      // when rescale is detected, operator requests new statistics from 
coordinator upon
-      // initialization.
+      // transient. when rescale is detected, operator requests new statistics 
from
+      // coordinator upon initialization.
       return partition;
     } else {
       // scale-down case.
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
index af78271ea5..dddb0d8722 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchRangePartitioner.java
@@ -18,17 +18,16 @@
  */
 package org.apache.iceberg.flink.sink.shuffle;
 
-import java.util.Arrays;
 import java.util.Comparator;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortKey;
 import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.SortOrderComparators;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.types.Comparators;
 
 class SketchRangePartitioner implements Partitioner<RowData> {
   private final SortKey sortKey;
@@ -38,7 +37,7 @@ class SketchRangePartitioner implements Partitioner<RowData> {
 
   SketchRangePartitioner(Schema schema, SortOrder sortOrder, SortKey[] 
rangeBounds) {
     this.sortKey = new SortKey(schema, sortOrder);
-    this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+    this.comparator = Comparators.forType(SortKeyUtil.sortKeySchema(schema, 
sortOrder).asStruct());
     this.rangeBounds = rangeBounds;
     this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
   }
@@ -47,18 +46,6 @@ class SketchRangePartitioner implements Partitioner<RowData> 
{
   public int partition(RowData row, int numPartitions) {
     // reuse the sortKey and rowDataWrapper
     sortKey.wrap(rowDataWrapper.wrap(row));
-    int partition = Arrays.binarySearch(rangeBounds, sortKey, comparator);
-
-    // binarySearch either returns the match location or -[insertion point]-1
-    if (partition < 0) {
-      partition = -partition - 1;
-    }
-
-    if (partition > rangeBounds.length) {
-      partition = rangeBounds.length;
-    }
-
-    return RangePartitioner.adjustPartitionWithRescale(
-        partition, rangeBounds.length + 1, numPartitions);
+    return SketchUtil.partition(sortKey, numPartitions, rangeBounds, 
comparator);
   }
 }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
index a58310611e..871ef9ef11 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
@@ -139,4 +139,21 @@ class SketchUtil {
           }
         });
   }
+
+  static int partition(
+      SortKey key, int numPartitions, SortKey[] rangeBounds, 
Comparator<StructLike> comparator) {
+    int partition = Arrays.binarySearch(rangeBounds, key, comparator);
+
+    // binarySearch either returns the match location or -[insertion point]-1
+    if (partition < 0) {
+      partition = -partition - 1;
+    }
+
+    if (partition > rangeBounds.length) {
+      partition = rangeBounds.length;
+    }
+
+    return RangePartitioner.adjustPartitionWithRescale(
+        partition, rangeBounds.length + 1, numPartitions);
+  }
 }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeyUtil.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeyUtil.java
new file mode 100644
index 0000000000..1e5bdbbac3
--- /dev/null
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeyUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class SortKeyUtil {
+  private SortKeyUtil() {}
+
+  /** Compute the result schema of {@code SortKey} transformation */
+  static Schema sortKeySchema(Schema schema, SortOrder sortOrder) {
+    List<SortField> sortFields = sortOrder.fields();
+    int size = sortFields.size();
+    List<Types.NestedField> transformedFields = 
Lists.newArrayListWithCapacity(size);
+    for (int i = 0; i < size; ++i) {
+      int sourceFieldId = sortFields.get(i).sourceId();
+      Types.NestedField sourceField = schema.findField(sourceFieldId);
+      Preconditions.checkArgument(
+          sourceField != null, "Cannot find source field: %s", sourceFieldId);
+      Type transformedType = 
sortFields.get(i).transform().getResultType(sourceField.type());
+      // There could be multiple transformations on the same source column, 
like in the PartitionKey
+      // case. To resolve the collision, field id is set to transform index 
and field name is set to
+      // sourceFieldName_transformIndex
+      Types.NestedField transformedField =
+          Types.NestedField.of(
+              i,
+              sourceField.isOptional(),
+              sourceField.name() + '_' + i,
+              transformedType,
+              sourceField.doc());
+      transformedFields.add(transformedField);
+    }
+
+    return new Schema(transformedFields);
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
new file mode 100644
index 0000000000..0485fdb7fa
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.SCHEMA;
+import static org.apache.iceberg.flink.sink.shuffle.Fixtures.SORT_ORDER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.jupiter.api.Test;
+
+public class TestRangePartitioner {
+  private final int numPartitions = 4;
+
+  @Test
+  public void testRoundRobinRecordsBeforeStatisticsAvailable() {
+    RangePartitioner partitioner = new RangePartitioner(SCHEMA, SORT_ORDER);
+    Set<Integer> results = Sets.newHashSetWithExpectedSize(numPartitions);
+    for (int i = 0; i < numPartitions; ++i) {
+      results.add(
+          partitioner.partition(
+              
StatisticsOrRecord.fromRecord(GenericRowData.of(StringData.fromString("a"), 1)),
+              numPartitions));
+    }
+
+    // round-robin. every partition should get an assignment
+    assertThat(results).containsExactlyInAnyOrder(0, 1, 2, 3);
+  }
+
+  @Test
+  public void testRoundRobinStatisticsWrapper() {
+    RangePartitioner partitioner = new RangePartitioner(SCHEMA, SORT_ORDER);
+    Set<Integer> results = Sets.newHashSetWithExpectedSize(numPartitions);
+    for (int i = 0; i < numPartitions; ++i) {
+      GlobalStatistics statistics =
+          GlobalStatistics.fromRangeBounds(1L, new SortKey[] 
{CHAR_KEYS.get("a")});
+      results.add(
+          partitioner.partition(StatisticsOrRecord.fromStatistics(statistics), 
numPartitions));
+    }
+
+    // round-robin. every partition should get an assignment
+    assertThat(results).containsExactlyInAnyOrder(0, 1, 2, 3);
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchRangePartitioner.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchRangePartitioner.java
new file mode 100644
index 0000000000..378c6afff0
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchRangePartitioner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TestFixtures;
+import org.junit.jupiter.api.Test;
+
+public class TestSketchRangePartitioner {
+  // sort on the long id field
+  private static final SortOrder SORT_ORDER =
+      SortOrder.builderFor(TestFixtures.SCHEMA).asc("id").build();
+  private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, 
SORT_ORDER);
+  private static final RowType ROW_TYPE = 
FlinkSchemaUtil.convert(TestFixtures.SCHEMA);
+  private static final int NUM_PARTITIONS = 16;
+  private static final long RANGE_STEP = 1_000;
+  private static final long MAX_ID = RANGE_STEP * NUM_PARTITIONS;
+  private static final SortKey[] RANGE_BOUNDS = createRangeBounds();
+
+  /**
+   * To understand how range bounds are used in range partitioning, here is an 
example for human
+   * ages with 4 partitions: [15, 32, 60]. The 4 ranges would be
+   *
+   * <ul>
+   *   <li>age <= 15
+   *   <li>age > 15 && age <= 32
+   *   <li>age >32 && age <= 60
+   *   <li>age > 60
+   * </ul>
+   */
+  private static SortKey[] createRangeBounds() {
+    SortKey[] rangeBounds = new SortKey[NUM_PARTITIONS - 1];
+    for (int i = 0; i < NUM_PARTITIONS - 1; ++i) {
+      RowData rowData =
+          GenericRowData.of(
+              StringData.fromString("data"),
+              RANGE_STEP * (i + 1),
+              StringData.fromString("2023-06-20"));
+      RowDataWrapper keyWrapper = new RowDataWrapper(ROW_TYPE, 
TestFixtures.SCHEMA.asStruct());
+      keyWrapper.wrap(rowData);
+      SortKey sortKey = new SortKey(TestFixtures.SCHEMA, SORT_ORDER);
+      sortKey.wrap(keyWrapper);
+      rangeBounds[i] = sortKey;
+    }
+
+    return rangeBounds;
+  }
+
+  @Test
+  public void testRangePartitioningWithRangeBounds() {
+    SketchRangePartitioner partitioner =
+        new SketchRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
RANGE_BOUNDS);
+    GenericRowData row =
+        GenericRowData.of(StringData.fromString("data"), 0L, 
StringData.fromString("2023-06-20"));
+    for (long id = 0; id < MAX_ID; ++id) {
+      row.setField(1, id);
+      int partition = partitioner.partition(row, NUM_PARTITIONS);
+      
assertThat(partition).isGreaterThanOrEqualTo(0).isLessThan(NUM_PARTITIONS);
+      int expectedPartition = id == 0L ? 0 : (int) ((id - 1) / RANGE_STEP);
+      assertThat(partition).isEqualTo(expectedPartition);
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
index 31dae5c76a..16202c075e 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchUtil.java
@@ -19,10 +19,13 @@
 package org.apache.iceberg.flink.sink.shuffle;
 
 import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS;
+import static 
org.apache.iceberg.flink.sink.shuffle.Fixtures.SORT_ORDER_COMPARTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.iceberg.SortKey;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 public class TestSketchUtil {
   @Test
@@ -55,7 +58,7 @@ public class TestSketchUtil {
     assertThat(
             SketchUtil.rangeBounds(
                 1,
-                Fixtures.SORT_ORDER_COMPARTOR,
+                SORT_ORDER_COMPARTOR,
                 new SortKey[] {
                   CHAR_KEYS.get("a"),
                   CHAR_KEYS.get("b"),
@@ -72,7 +75,7 @@ public class TestSketchUtil {
     assertThat(
             SketchUtil.rangeBounds(
                 3,
-                Fixtures.SORT_ORDER_COMPARTOR,
+                SORT_ORDER_COMPARTOR,
                 new SortKey[] {
                   CHAR_KEYS.get("a"),
                   CHAR_KEYS.get("b"),
@@ -90,7 +93,7 @@ public class TestSketchUtil {
     assertThat(
             SketchUtil.rangeBounds(
                 4,
-                Fixtures.SORT_ORDER_COMPARTOR,
+                SORT_ORDER_COMPARTOR,
                 new SortKey[] {
                   CHAR_KEYS.get("a"),
                   CHAR_KEYS.get("b"),
@@ -113,7 +116,7 @@ public class TestSketchUtil {
     assertThat(
             SketchUtil.rangeBounds(
                 4,
-                Fixtures.SORT_ORDER_COMPARTOR,
+                SORT_ORDER_COMPARTOR,
                 new SortKey[] {
                   CHAR_KEYS.get("a"),
                   CHAR_KEYS.get("b"),
@@ -130,4 +133,57 @@ public class TestSketchUtil {
         // skipped duplicate c's
         .containsExactly(CHAR_KEYS.get("c"), CHAR_KEYS.get("g"), 
CHAR_KEYS.get("j"));
   }
+
+  @ParameterizedTest
+  @ValueSource(ints = {4, 6})
+  public void testPartitioningAndScaleUp(int numPartitions) {
+    // Range bounds are calculated based on 4 partitions
+    SortKey[] rangeBounds =
+        new SortKey[] {CHAR_KEYS.get("c"), CHAR_KEYS.get("j"), 
CHAR_KEYS.get("m")};
+
+    // <= c
+    assertPartition(0, CHAR_KEYS.get("a"), numPartitions, rangeBounds);
+    assertPartition(0, CHAR_KEYS.get("c"), numPartitions, rangeBounds);
+    // > c && <= j
+    assertPartition(1, CHAR_KEYS.get("d"), numPartitions, rangeBounds);
+    assertPartition(1, CHAR_KEYS.get("i"), numPartitions, rangeBounds);
+    assertPartition(1, CHAR_KEYS.get("j"), numPartitions, rangeBounds);
+    // > j && <= m
+    assertPartition(2, CHAR_KEYS.get("k"), numPartitions, rangeBounds);
+    assertPartition(2, CHAR_KEYS.get("l"), numPartitions, rangeBounds);
+    assertPartition(2, CHAR_KEYS.get("m"), numPartitions, rangeBounds);
+    // > m
+    assertPartition(3, CHAR_KEYS.get("n"), numPartitions, rangeBounds);
+    assertPartition(3, CHAR_KEYS.get("z"), numPartitions, rangeBounds);
+  }
+
+  @Test
+  public void testPartitionScaleDown() {
+    // Range bounds are calculated based on 4 partitions
+    SortKey[] rangeBounds =
+        new SortKey[] {CHAR_KEYS.get("c"), CHAR_KEYS.get("j"), 
CHAR_KEYS.get("m")};
+    int numPartitions = 3;
+
+    // <= c
+    assertPartition(0, CHAR_KEYS.get("a"), numPartitions, rangeBounds);
+    assertPartition(0, CHAR_KEYS.get("c"), numPartitions, rangeBounds);
+    // > c && <= j
+    assertPartition(1, CHAR_KEYS.get("d"), numPartitions, rangeBounds);
+    assertPartition(1, CHAR_KEYS.get("i"), numPartitions, rangeBounds);
+    assertPartition(1, CHAR_KEYS.get("j"), numPartitions, rangeBounds);
+    // > j && <= m
+    assertPartition(2, CHAR_KEYS.get("k"), numPartitions, rangeBounds);
+    assertPartition(2, CHAR_KEYS.get("l"), numPartitions, rangeBounds);
+    assertPartition(2, CHAR_KEYS.get("m"), numPartitions, rangeBounds);
+    // > m
+    // reassigns out-of-range partitions via mod (% 3 in this case)
+    assertPartition(0, CHAR_KEYS.get("n"), numPartitions, rangeBounds);
+    assertPartition(0, CHAR_KEYS.get("z"), numPartitions, rangeBounds);
+  }
+
+  private static void assertPartition(
+      int expectedPartition, SortKey key, int numPartitions, SortKey[] 
rangeBounds) {
+    assertThat(SketchUtil.partition(key, numPartitions, rangeBounds, 
SORT_ORDER_COMPARTOR))
+        .isEqualTo(expectedPartition);
+  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeyUtil.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeyUtil.java
new file mode 100644
index 0000000000..1be7e27f2c
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeyUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class TestSortKeyUtil {
+  @Test
+  public void testResultSchema() {
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.StringType.get()),
+            Types.NestedField.required(2, "ratio", Types.DoubleType.get()),
+            Types.NestedField.optional(
+                3,
+                "user",
+                Types.StructType.of(
+                    Types.NestedField.required(11, "name", 
Types.StringType.get()),
+                    Types.NestedField.required(12, "ts", 
Types.TimestampType.withoutZone()),
+                    Types.NestedField.optional(13, "device_id", 
Types.UUIDType.get()),
+                    Types.NestedField.optional(
+                        14,
+                        "location",
+                        Types.StructType.of(
+                            Types.NestedField.required(101, "lat", 
Types.FloatType.get()),
+                            Types.NestedField.required(102, "long", 
Types.FloatType.get()),
+                            Types.NestedField.required(103, "blob", 
Types.BinaryType.get()))))));
+
+    SortOrder sortOrder =
+        SortOrder.builderFor(schema)
+            .asc("ratio")
+            .sortBy(Expressions.hour("user.ts"), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .sortBy(
+                Expressions.bucket("user.device_id", 16), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .sortBy(
+                Expressions.truncate("user.location.blob", 16),
+                SortDirection.ASC,
+                NullOrder.NULLS_FIRST)
+            .build();
+
+    assertThat(SortKeyUtil.sortKeySchema(schema, sortOrder).asStruct())
+        .isEqualTo(
+            Types.StructType.of(
+                Types.NestedField.required(0, "ratio_0", 
Types.DoubleType.get()),
+                Types.NestedField.required(1, "ts_1", Types.IntegerType.get()),
+                Types.NestedField.optional(2, "device_id_2", 
Types.IntegerType.get()),
+                Types.NestedField.required(3, "blob_3", 
Types.BinaryType.get())));
+  }
+}

Reply via email to