This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 01ab0d9316 Flink: Custom partitioner for bucket partitions (#7161)
01ab0d9316 is described below

commit 01ab0d931644ffd4677e98c0d3ab3919edea0c62
Author: kengtin <[email protected]>
AuthorDate: Wed Aug 9 21:52:51 2023 -0700

    Flink: Custom partitioner for bucket partitions (#7161)
---
 .../flink/sink/BucketPartitionKeySelector.java     |  70 ++++++
 .../iceberg/flink/sink/BucketPartitioner.java      | 103 +++++++++
 .../iceberg/flink/sink/BucketPartitionerUtil.java  | 125 +++++++++++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |   8 +-
 .../iceberg/flink/HadoopCatalogExtension.java      | 104 +++++++++
 .../flink/sink/TestBucketPartitionKeySelector.java |  65 ++++++
 .../iceberg/flink/sink/TestBucketPartitioner.java  | 107 +++++++++
 .../TestBucketPartitionerFlinkIcebergSink.java     | 241 +++++++++++++++++++++
 .../flink/sink/TestBucketPartitionerUtil.java      | 126 +++++++++++
 9 files changed, 948 insertions(+), 1 deletion(-)

diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java
new file mode 100644
index 0000000000..1cb6e013bd
--- /dev/null
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.stream.IntStream;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+
+/**
+ * A {@link KeySelector} that extracts the bucketId from a data row's bucket 
partition as the key.
+ * To be used with the {@link BucketPartitioner}.
+ */
+class BucketPartitionKeySelector implements KeySelector<RowData, Integer> {
+
+  private final Schema schema;
+  private final PartitionKey partitionKey;
+  private final RowType flinkSchema;
+  private final int bucketFieldPosition;
+
+  private transient RowDataWrapper rowDataWrapper;
+
+  BucketPartitionKeySelector(PartitionSpec partitionSpec, Schema schema, 
RowType flinkSchema) {
+    this.schema = schema;
+    this.partitionKey = new PartitionKey(partitionSpec, schema);
+    this.flinkSchema = flinkSchema;
+    this.bucketFieldPosition = getBucketFieldPosition(partitionSpec);
+  }
+
+  private int getBucketFieldPosition(PartitionSpec partitionSpec) {
+    int bucketFieldId = BucketPartitionerUtil.getBucketFieldId(partitionSpec);
+    return IntStream.range(0, partitionSpec.fields().size())
+        .filter(i -> partitionSpec.fields().get(i).fieldId() == bucketFieldId)
+        .toArray()[0];
+  }
+
+  private RowDataWrapper lazyRowDataWrapper() {
+    if (rowDataWrapper == null) {
+      rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+    }
+
+    return rowDataWrapper;
+  }
+
+  @Override
+  public Integer getKey(RowData rowData) {
+    partitionKey.partition(lazyRowDataWrapper().wrap(rowData));
+    return partitionKey.get(bucketFieldPosition, Integer.class);
+  }
+}
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java
new file mode 100644
index 0000000000..9c9a117906
--- /dev/null
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_NULL_MESSAGE = "bucketId cannot be null";
+  static final String BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE =
+      "Invalid bucket ID %s: must be non-negative.";
+  static final String BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE =
+      "Invalid bucket ID %s: must be less than bucket limit: %s.";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket, only used 
when writers > the
+  // number of buckets
+  private final int[] currentBucketWriterOffset;
+
+  BucketPartitioner(PartitionSpec partitionSpec) {
+    this.maxNumBuckets = BucketPartitionerUtil.getMaxNumBuckets(partitionSpec);
+    this.currentBucketWriterOffset = new int[maxNumBuckets];
+  }
+
+  /**
+   * Determine the partition id based on the following criteria: If the number 
of writers <= the
+   * number of buckets, an evenly distributed number of buckets will be 
assigned to each writer (one
+   * writer -> many buckets). Conversely, if the number of writers > the 
number of buckets the logic
+   * is handled by the {@link #getPartitionWithMoreWritersThanBuckets
+   * getPartitionWritersGreaterThanBuckets} method.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition id (writer) to use for each request
+   */
+  @Override
+  public int partition(Integer bucketId, int numPartitions) {
+    Preconditions.checkNotNull(bucketId, BUCKET_NULL_MESSAGE);
+    Preconditions.checkArgument(bucketId >= 0, 
BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, bucketId);
+    Preconditions.checkArgument(
+        bucketId < maxNumBuckets, BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE, 
bucketId, maxNumBuckets);
+
+    if (numPartitions <= maxNumBuckets) {
+      return bucketId % numPartitions;
+    } else {
+      return getPartitionWithMoreWritersThanBuckets(bucketId, numPartitions);
+    }
+  }
+
+  /*-
+   * If the number of writers > the number of buckets each partitioner will 
keep a state of multiple
+   * writers per bucket as evenly as possible, and will round-robin the 
requests across them, in this
+   * case each writer will target only one bucket at all times (many writers 
-> one bucket). Example:
+   * Configuration: numPartitions (writers) = 5, maxBuckets = 2
+   * Expected behavior:
+   * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4
+   * - Records for Bucket 1 will always use Writer 1 and 3
+   * Notes:
+   * - maxNumWritersPerBucket determines when to reset the 
currentBucketWriterOffset to 0 for this bucketId
+   * - When numPartitions is not evenly divisible by maxBuckets, some buckets 
will have one more writer (extraWriter).
+   * In this example Bucket 0 has an "extra writer" to consider before 
resetting its offset to 0.
+   *
+   * @return the destination partition index (writer subtask id)
+   */
+  private int getPartitionWithMoreWritersThanBuckets(int bucketId, int 
numPartitions) {
+    int currentOffset = currentBucketWriterOffset[bucketId];
+    // Determine if this bucket requires an "extra writer"
+    int extraWriter = bucketId < (numPartitions % maxNumBuckets) ? 1 : 0;
+    // The max number of writers this bucket can have
+    int maxNumWritersPerBucket = (numPartitions / maxNumBuckets) + extraWriter;
+
+    // Increment the writer offset or reset if it's reached the max for this 
bucket
+    int nextOffset = currentOffset == maxNumWritersPerBucket - 1 ? 0 : 
currentOffset + 1;
+    currentBucketWriterOffset[bucketId] = nextOffset;
+
+    return bucketId + (maxNumBuckets * currentOffset);
+  }
+}
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java
new file mode 100644
index 0000000000..c33207728d
--- /dev/null
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+
+final class BucketPartitionerUtil {
+  static final String BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE =
+      "Invalid number of buckets: %s (must be 1)";
+
+  private BucketPartitionerUtil() {}
+
+  /**
+   * Determines whether the PartitionSpec has one and only one Bucket 
definition
+   *
+   * @param partitionSpec the partition spec in question
+   * @return whether the PartitionSpec has only one Bucket
+   */
+  static boolean hasOneBucketField(PartitionSpec partitionSpec) {
+    List<Tuple2<Integer, Integer>> bucketFields = 
getBucketFields(partitionSpec);
+    return bucketFields != null && bucketFields.size() == 1;
+  }
+
+  /**
+   * Extracts the Bucket definition from a PartitionSpec.
+   *
+   * @param partitionSpec the partition spec in question
+   * @return the Bucket definition in the form of a tuple (fieldId, 
maxNumBuckets)
+   */
+  private static Tuple2<Integer, Integer> getBucketFieldInfo(PartitionSpec 
partitionSpec) {
+    List<Tuple2<Integer, Integer>> bucketFields = 
getBucketFields(partitionSpec);
+    Preconditions.checkArgument(
+        bucketFields.size() == 1,
+        BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE,
+        bucketFields.size());
+    return bucketFields.get(0);
+  }
+
+  static int getBucketFieldId(PartitionSpec partitionSpec) {
+    return getBucketFieldInfo(partitionSpec).f0;
+  }
+
+  static int getMaxNumBuckets(PartitionSpec partitionSpec) {
+    return getBucketFieldInfo(partitionSpec).f1;
+  }
+
+  private static List<Tuple2<Integer, Integer>> getBucketFields(PartitionSpec 
spec) {
+    return PartitionSpecVisitor.visit(spec, new 
BucketPartitionSpecVisitor()).stream()
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
+  private static class BucketPartitionSpecVisitor
+      implements PartitionSpecVisitor<Tuple2<Integer, Integer>> {
+    @Override
+    public Tuple2<Integer, Integer> identity(int fieldId, String sourceName, 
int sourceId) {
+      return null;
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> bucket(
+        int fieldId, String sourceName, int sourceId, int numBuckets) {
+      return new Tuple2<>(fieldId, numBuckets);
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> truncate(
+        int fieldId, String sourceName, int sourceId, int width) {
+      return null;
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> year(int fieldId, String sourceName, int 
sourceId) {
+      return null;
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> month(int fieldId, String sourceName, int 
sourceId) {
+      return null;
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> day(int fieldId, String sourceName, int 
sourceId) {
+      return null;
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> hour(int fieldId, String sourceName, int 
sourceId) {
+      return null;
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> alwaysNull(int fieldId, String sourceName, 
int sourceId) {
+      return null;
+    }
+
+    @Override
+    public Tuple2<Integer, Integer> unknown(
+        int fieldId, String sourceName, int sourceId, String transform) {
+      return null;
+    }
+  }
+}
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 844efd6ca6..0237027901 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -509,7 +509,13 @@ public class FlinkSink {
                       + "and table is unpartitioned");
               return input;
             } else {
-              return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+              if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) {
+                return input.partitionCustom(
+                    new BucketPartitioner(partitionSpec),
+                    new BucketPartitionKeySelector(partitionSpec, iSchema, 
flinkRowType));
+              } else {
+                return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+              }
             }
           } else {
             if (partitionSpec.isUnpartitioned()) {
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java
new file mode 100644
index 0000000000..d8e1325254
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class HadoopCatalogExtension
+    implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, 
AfterEachCallback {
+  protected final String database;
+  protected final String tableName;
+
+  protected Path temporaryFolder;
+  protected Catalog catalog;
+  protected CatalogLoader catalogLoader;
+  protected String warehouse;
+  protected TableLoader tableLoader;
+
+  public HadoopCatalogExtension(String database, String tableName) {
+    this.database = database;
+    this.tableName = tableName;
+  }
+
+  @Override
+  public void beforeAll(ExtensionContext context) throws Exception {
+    this.temporaryFolder = Files.createTempDirectory("junit5_hadoop_catalog-");
+  }
+
+  @Override
+  public void afterAll(ExtensionContext context) throws Exception {
+    FileUtils.deleteDirectory(temporaryFolder.toFile());
+  }
+
+  @Override
+  public void beforeEach(ExtensionContext context) throws Exception {
+    Assertions.assertThat(temporaryFolder).exists().isDirectory();
+    this.warehouse = "file:" + temporaryFolder + "/" + UUID.randomUUID();
+    this.catalogLoader =
+        CatalogLoader.hadoop(
+            "hadoop",
+            new Configuration(),
+            ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse));
+    this.catalog = catalogLoader.loadCatalog();
+    this.tableLoader =
+        TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(database, 
tableName));
+  }
+
+  @Override
+  public void afterEach(ExtensionContext context) throws Exception {
+    try {
+      catalog.dropTable(TableIdentifier.of(database, tableName));
+      ((HadoopCatalog) catalog).close();
+      tableLoader.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to close catalog resource");
+    }
+  }
+
+  public TableLoader tableLoader() {
+    return tableLoader;
+  }
+
+  public Catalog catalog() {
+    return catalog;
+  }
+
+  public CatalogLoader catalogLoader() {
+    return catalogLoader;
+  }
+
+  public String warehouse() {
+    return warehouse;
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java
new file mode 100644
index 0000000000..5ebcc6361c
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionKeySelector {
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testCorrectKeySelection(TableSchemaType tableSchemaType) {
+    int numBuckets = 60;
+
+    PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+    BucketPartitionKeySelector keySelector =
+        new BucketPartitionKeySelector(
+            partitionSpec, SimpleDataUtil.SCHEMA, SimpleDataUtil.ROW_TYPE);
+
+    TestBucketPartitionerUtil.generateRowsForBucketIdRange(2, numBuckets)
+        .forEach(
+            rowData -> {
+              int expectedBucketId =
+                  TestBucketPartitionerUtil.computeBucketId(
+                      numBuckets, rowData.getString(1).toString());
+              Integer key = keySelector.getKey(rowData);
+              Assertions.assertThat(key).isEqualTo(expectedBucketId);
+            });
+  }
+
+  @Test
+  public void testKeySelectorMultipleBucketsFail() {
+    PartitionSpec partitionSpec = 
TableSchemaType.TWO_BUCKETS.getPartitionSpec(1);
+
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(
+            () ->
+                new BucketPartitionKeySelector(
+                    partitionSpec, SimpleDataUtil.SCHEMA, 
SimpleDataUtil.ROW_TYPE))
+        
.withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2);
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java
new file mode 100644
index 0000000000..835713e6b4
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE;
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE;
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_NULL_MESSAGE;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+public class TestBucketPartitioner {
+
+  static final int DEFAULT_NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60", 
"IDENTITY_AND_BUCKET,60"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      String schemaTypeStr, String numBucketsStr) {
+    int numPartitions = 500;
+    TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr);
+    int numBuckets = Integer.parseInt(numBucketsStr);
+    PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    int bucketId = 0;
+    for (int expectedIndex = 0; expectedIndex < numPartitions; 
expectedIndex++) {
+      int actualPartitionIndex = bucketPartitioner.partition(bucketId, 
numPartitions);
+      Assertions.assertThat(actualPartitionIndex).isEqualTo(expectedIndex);
+      bucketId++;
+      if (bucketId == numBuckets) {
+        bucketId = 0;
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60", 
"IDENTITY_AND_BUCKET,60"})
+  public void testPartitioningParallelismEqualLessThanBuckets(
+      String schemaTypeStr, String numBucketsStr) {
+    int numPartitions = 30;
+    TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr);
+    int numBuckets = Integer.parseInt(numBucketsStr);
+    PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+      int actualPartitionIndex = bucketPartitioner.partition(bucketId, 
numPartitions);
+      Assertions.assertThat(actualPartitionIndex).isEqualTo(bucketId % 
numPartitions);
+    }
+  }
+
+  @Test
+  public void testPartitionerBucketIdNullFail() {
+    PartitionSpec partitionSpec = 
TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS);
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> bucketPartitioner.partition(null, 
DEFAULT_NUM_BUCKETS))
+        .withMessage(BUCKET_NULL_MESSAGE);
+  }
+
+  @Test
+  public void testPartitionerMultipleBucketsFail() {
+    PartitionSpec partitionSpec = 
TableSchemaType.TWO_BUCKETS.getPartitionSpec(DEFAULT_NUM_BUCKETS);
+
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> new BucketPartitioner(partitionSpec))
+        
.withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2);
+  }
+
+  @Test
+  public void testPartitionerBucketIdOutOfRangeFail() {
+    PartitionSpec partitionSpec = 
TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS);
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    int negativeBucketId = -1;
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> bucketPartitioner.partition(negativeBucketId, 1))
+        .withMessage(BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, negativeBucketId);
+
+    int tooBigBucketId = DEFAULT_NUM_BUCKETS;
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> bucketPartitioner.partition(tooBigBucketId, 1))
+        .withMessage(BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE, tooBigBucketId, 
DEFAULT_NUM_BUCKETS);
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
new file mode 100644
index 0000000000..29a0898a1b
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionerFlinkIcebergSink {
+
+  private static final int NUMBER_TASK_MANAGERS = 1;
+  private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogExtension =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  // Parallelism = 8 (parallelism > numBuckets) throughout the test suite
+  private final int parallelism = NUMBER_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
+  private final FileFormat format = FileFormat.PARQUET;
+  private final int numBuckets = 4;
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private void setupEnvironment(TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+    table =
+        catalogExtension
+            .catalog()
+            .createTable(
+                TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                partitionSpec,
+                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+    env =
+        
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism * 2);
+    tableLoader = catalogExtension.tableLoader();
+  }
+
+  private void appendRowsToTable(List<RowData> allRows) throws Exception {
+    DataFormatConverters.RowConverter converter =
+        new 
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+    DataStream<RowData> dataStream =
+        env.addSource(
+                new BoundedTestSource<>(
+                    
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
+                ROW_TYPE_INFO)
+            .map(converter::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    FlinkSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .distributionMode(DistributionMode.HASH)
+        .append();
+
+    env.execute("Test Iceberg DataStream");
+
+    SimpleDataUtil.assertTableRows(table, allRows);
+  }
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testSendRecordsToAllBucketsEvenly(TableSchemaType 
tableSchemaType) throws Exception {
+    setupEnvironment(tableSchemaType);
+    List<RowData> rows = generateTestDataRows();
+
+    appendRowsToTable(rows);
+    TableTestStats stats = extractPartitionResults(tableSchemaType);
+
+    Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size());
+    // All 4 buckets should've been written to
+    Assertions.assertThat(stats.writersPerBucket.size()).isEqualTo(numBuckets);
+    
Assertions.assertThat(stats.numFilesPerBucket.size()).isEqualTo(numBuckets);
+    // Writer expectation (2 writers per bucket):
+    // - Bucket0 -> Writers [0, 4]
+    // - Bucket1 -> Writers [1, 5]
+    // - Bucket2 -> Writers [2, 6]
+    // - Bucket3 -> Writers [3, 7]
+    for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) {
+      
Assertions.assertThat(stats.writersPerBucket.get(i)).hasSameElementsAs(Arrays.asList(i,
 j));
+      // 2 files per bucket (one file is created by each writer)
+      Assertions.assertThat(stats.numFilesPerBucket.get(i)).isEqualTo(2);
+      // 2 rows per file (total of 16 rows across 8 files)
+      Assertions.assertThat(stats.rowsPerWriter.get(i)).isEqualTo(2);
+    }
+  }
+
+  /**
+   * Verifies the BucketPartitioner is not used when the PartitionSpec has 
more than 1 bucket, and
+   * that it should fallback to input.keyBy
+   */
+  @ParameterizedTest
+  @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS")
+  public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) 
throws Exception {
+    setupEnvironment(tableSchemaType);
+    List<RowData> rows = generateTestDataRows();
+
+    appendRowsToTable(rows);
+    TableTestStats stats = extractPartitionResults(tableSchemaType);
+
+    Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size());
+    for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) {
+      // Only 1 file per bucket will be created when falling back to 
input.keyBy(...)
+      Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1);
+    }
+  }
+
+  /**
+   * Generating 16 rows to be sent uniformly to all writers (round-robin 
across 8 writers -> 4
+   * buckets)
+   */
+  private List<RowData> generateTestDataRows() {
+    int totalNumRows = parallelism * 2;
+    int numRowsPerBucket = totalNumRows / numBuckets;
+    return 
TestBucketPartitionerUtil.generateRowsForBucketIdRange(numRowsPerBucket, 
numBuckets);
+  }
+
+  private TableTestStats extractPartitionResults(TableSchemaType 
tableSchemaType)
+      throws IOException {
+    int totalRecordCount = 0;
+    Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); // 
<BucketId, List<WriterId>>
+    Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId, 
NumFiles>
+    Map<Integer, Long> rowsPerWriter = Maps.newHashMap(); // <WriterId, 
NumRecords>
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
+      for (FileScanTask scanTask : fileScanTasks) {
+        long recordCountInFile = scanTask.file().recordCount();
+
+        String[] splitFilePath = scanTask.file().path().toString().split("/");
+        // Filename example: 
00007-0-a7d3a29a-33e9-4740-88f4-0f494397d60c-00001.parquet
+        // Writer ID: .......^^^^^
+        String filename = splitFilePath[splitFilePath.length - 1];
+        int writerId = Integer.parseInt(filename.split("-")[0]);
+
+        totalRecordCount += recordCountInFile;
+        int bucketId =
+            scanTask
+                .file()
+                .partition()
+                .get(tableSchemaType.bucketPartitionColumnPosition(), 
Integer.class);
+        writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList());
+        writersPerBucket.get(bucketId).add(writerId);
+        filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) 
+ 1);
+        rowsPerWriter.put(writerId, rowsPerWriter.getOrDefault(writerId, 0L) + 
recordCountInFile);
+      }
+    }
+
+    return new TableTestStats(totalRecordCount, writersPerBucket, 
filesPerBucket, rowsPerWriter);
+  }
+
+  /** DTO to hold Test Stats */
+  private static class TableTestStats {
+    final int totalRowCount;
+    final Map<Integer, List<Integer>> writersPerBucket;
+    final Map<Integer, Integer> numFilesPerBucket;
+    final Map<Integer, Long> rowsPerWriter;
+
+    TableTestStats(
+        int totalRecordCount,
+        Map<Integer, List<Integer>> writersPerBucket,
+        Map<Integer, Integer> numFilesPerBucket,
+        Map<Integer, Long> rowsPerWriter) {
+      this.totalRowCount = totalRecordCount;
+      this.writersPerBucket = writersPerBucket;
+      this.numFilesPerBucket = numFilesPerBucket;
+      this.rowsPerWriter = rowsPerWriter;
+    }
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java
new file mode 100644
index 0000000000..e1309bfac6
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.BucketUtil;
+
+final class TestBucketPartitionerUtil {
+
+  enum TableSchemaType {
+    ONE_BUCKET {
+      @Override
+      public int bucketPartitionColumnPosition() {
+        return 0;
+      }
+
+      @Override
+      public PartitionSpec getPartitionSpec(int numBuckets) {
+        return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", 
numBuckets).build();
+      }
+    },
+    IDENTITY_AND_BUCKET {
+      @Override
+      public int bucketPartitionColumnPosition() {
+        return 1;
+      }
+
+      @Override
+      public PartitionSpec getPartitionSpec(int numBuckets) {
+        return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+            .identity("id")
+            .bucket("data", numBuckets)
+            .build();
+      }
+    },
+    TWO_BUCKETS {
+      @Override
+      public int bucketPartitionColumnPosition() {
+        return 1;
+      }
+
+      @Override
+      public PartitionSpec getPartitionSpec(int numBuckets) {
+        return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+            .bucket("id", numBuckets)
+            .bucket("data", numBuckets)
+            .build();
+      }
+    };
+
+    public abstract int bucketPartitionColumnPosition();
+
+    public abstract PartitionSpec getPartitionSpec(int numBuckets);
+  }
+
+  private TestBucketPartitionerUtil() {}
+
+  /**
+   * Utility method to generate rows whose values will "hash" to a range of 
bucketIds (from 0 to
+   * numBuckets - 1)
+   *
+   * @param numRowsPerBucket how many different rows should be generated per 
bucket
+   * @param numBuckets max number of buckets to consider
+   * @return the list of rows whose data "hashes" to the desired bucketId
+   */
+  static List<RowData> generateRowsForBucketIdRange(int numRowsPerBucket, int 
numBuckets) {
+    List<RowData> rows = Lists.newArrayListWithCapacity(numBuckets * 
numRowsPerBucket);
+    // For some of our tests, this order of the generated rows matters
+    for (int i = 0; i < numRowsPerBucket; i++) {
+      for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+        String value = generateValueForBucketId(bucketId, numBuckets);
+        rows.add(GenericRowData.of(1, StringData.fromString(value)));
+      }
+    }
+    return rows;
+  }
+
+  /**
+   * Utility method to generate a UUID string that will "hash" to a desired 
bucketId
+   *
+   * @param bucketId the desired bucketId
+   * @return the string data that "hashes" to the desired bucketId
+   */
+  private static String generateValueForBucketId(int bucketId, int numBuckets) 
{
+    while (true) {
+      String uuid = UUID.randomUUID().toString();
+      if (computeBucketId(numBuckets, uuid) == bucketId) {
+        return uuid;
+      }
+    }
+  }
+
+  /**
+   * Utility that performs the same hashing/bucketing mechanism used by 
Bucket.java
+   *
+   * @param numBuckets max number of buckets to consider
+   * @param value the string to compute the bucketId from
+   * @return the computed bucketId
+   */
+  static int computeBucketId(int numBuckets, String value) {
+    return (BucketUtil.hash(value) & Integer.MAX_VALUE) % numBuckets;
+  }
+}


Reply via email to