stevenzwu commented on code in PR #7161:
URL: https://github.com/apache/iceberg/pull/7161#discussion_r1186975564


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of 
range: ";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket
+  private final int[] currentBucketWriterOffset;
+
+  BucketPartitioner(PartitionSpec partitionSpec) {
+    Tuple2<Integer, Integer> bucketFieldInfo =
+        BucketPartitionerUtils.getBucketFieldInfo(partitionSpec);
+
+    this.maxNumBuckets = bucketFieldInfo.f1;
+    this.currentBucketWriterOffset = new int[this.maxNumBuckets];
+  }
+
+  /**
+   * Determine the partition id based on the following criteria: If the number 
of writers <= the
+   * number of buckets, an evenly distributed number of buckets will be 
assigned to each writer (one
+   * writer -> many buckets). Conversely, if the number of writers > the 
number of buckets the logic
+   * is handled by the {@link #getPartitionWritersGreaterThanBuckets
+   * getPartitionWritersGreaterThanBuckets} method.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition id (writer) to use for each request
+   */
+  @Override
+  public int partition(Integer bucketId, int numPartitions) {
+    Preconditions.checkArgument(
+        bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must 
be >= 0)");

Review Comment:
   Nit: error msg  could be `Invalid bucket ID: %s. Must be non-negative.`. 
note that `Preconditions` supports arg format.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.stream.IntStream;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+
+/**
+ * A {@link KeySelector} that extracts the bucketId from a data row's bucket 
partition as the key.
+ * To be used with the {@link BucketPartitioner}.
+ */
+class BucketPartitionKeySelector implements KeySelector<RowData, Integer> {
+
+  private final Schema schema;
+  private final PartitionKey partitionKey;
+  private final RowType flinkSchema;
+  private final int bucketFieldPosition;
+
+  private transient RowDataWrapper rowDataWrapper;
+
+  BucketPartitionKeySelector(PartitionSpec partitionSpec, Schema schema, 
RowType flinkSchema) {
+    Tuple2<Integer, Integer> bucketFieldInfo =
+        BucketPartitionerUtils.getBucketFieldInfo(partitionSpec);
+
+    int bucketFieldId = bucketFieldInfo.f0;
+    this.schema = schema;
+    this.partitionKey = new PartitionKey(partitionSpec, schema);
+    this.flinkSchema = flinkSchema;
+    this.bucketFieldPosition =
+        IntStream.range(0, partitionSpec.fields().size())
+            .filter(i -> partitionSpec.fields().get(i).fieldId() == 
bucketFieldId)
+            .toArray()[0];
+  }
+
+  /**

Review Comment:
   nit: this comment is probably not necessary. also tries to avoid use words 
like `we`



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.BucketUtil;
+
+final class TestBucketPartitionerUtils {
+
+  enum TableSchemaType {
+    ONE_BUCKET,
+    IDENTITY_AND_BUCKET,
+    TWO_BUCKETS;
+  }
+
+  private TestBucketPartitionerUtils() {}
+
+  static final DataFormatConverters.RowConverter CONVERTER =
+      new 
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int 
numBuckets) {
+    PartitionSpec partitionSpec = null;
+
+    switch (tableSchemaType) {
+      case ONE_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", 
numBuckets).build();
+        break;
+      case IDENTITY_AND_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .identity("id")
+                .bucket("data", numBuckets)
+                .build();
+        break;
+      case TWO_BUCKETS:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .bucket("id", numBuckets)
+                .bucket("data", numBuckets)
+                .build();
+        break;
+    }
+
+    Preconditions.checkNotNull(
+        partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType);
+    return partitionSpec;
+  }
+
+  /**
+   * Utility method to generate rows whose values will "hash" to a range of 
bucketIds (from 0 to
+   * numBuckets - 1)
+   *
+   * @param numRowsPerBucket how many different rows should be generated per 
bucket
+   * @param numBuckets max number of buckets to consider
+   * @return the list of rows whose data "hashes" to the desired bucketId
+   */
+  static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int 
numBuckets) {
+    List<Row> rows = Lists.newArrayListWithCapacity(numBuckets * 
numRowsPerBucket);
+    // For some of our tests, this order of the generated rows matters
+    for (int i = 0; i < numRowsPerBucket; i++) {
+      for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+        String value = generateValueForBucketId(bucketId, numBuckets);
+        rows.add(Row.of(1, value));
+      }
+    }
+    return rows;
+  }
+
+  /**
+   * Utility method to generate a UUID string that will "hash" to a desired 
bucketId
+   *
+   * @param bucketId the desired bucketId
+   * @return the string data that "hashes" to the desired bucketId
+   */
+  private static String generateValueForBucketId(int bucketId, int numBuckets) 
{
+    String value = "";
+    while (true) {
+      String uuid = UUID.randomUUID().toString();
+      if (computeBucketId(numBuckets, uuid) == bucketId) {
+        value = uuid;
+        break;
+      }
+    }
+    return value;
+  }
+
+  /**
+   * Utility that performs the same hashing/bucketing mechanism used by 
Bucket.java

Review Comment:
   looking at Bucket.java. we also need to add test coverage for null bucket id.
   
   ```
     public Integer apply(T value) {
       if (value == null) {
         return null;
       }
       return (hash(value) & Integer.MAX_VALUE) % numBuckets;
     }
   ```



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of 
range: ";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket
+  private final int[] currentBucketWriterOffset;
+
+  BucketPartitioner(PartitionSpec partitionSpec) {
+    Tuple2<Integer, Integer> bucketFieldInfo =
+        BucketPartitionerUtils.getBucketFieldInfo(partitionSpec);
+
+    this.maxNumBuckets = bucketFieldInfo.f1;
+    this.currentBucketWriterOffset = new int[this.maxNumBuckets];
+  }
+
+  /**
+   * Determine the partition id based on the following criteria: If the number 
of writers <= the
+   * number of buckets, an evenly distributed number of buckets will be 
assigned to each writer (one
+   * writer -> many buckets). Conversely, if the number of writers > the 
number of buckets the logic
+   * is handled by the {@link #getPartitionWritersGreaterThanBuckets
+   * getPartitionWritersGreaterThanBuckets} method.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition id (writer) to use for each request
+   */
+  @Override
+  public int partition(Integer bucketId, int numPartitions) {
+    Preconditions.checkArgument(
+        bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must 
be >= 0)");
+    Preconditions.checkArgument(
+        bucketId < maxNumBuckets,
+        BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX

Review Comment:
   nite: error msg could be `Invalid bucket ID: %s. Must be less than bucket 
limit: %s`



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; 
expectedIdx++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(expectedIdx);
+      if (++bucketId == NUM_BUCKETS) {
+        bucketId = 0;
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismEqualLessThanBuckets(

Review Comment:
   similarly, test dividable and non-dividable



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; 
expectedIdx++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(expectedIdx);
+      if (++bucketId == NUM_BUCKETS) {
+        bucketId = 0;
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismEqualLessThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 30;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int bucketId = 0; bucketId < NUM_BUCKETS; bucketId++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(bucketId % numPartitions);
+    }
+  }
+
+  @ParameterizedTest

Review Comment:
   parameterized test seems unnecessary here



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
+import static 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionerFlinkIcebergSink {
+
+  private static final int NUMBER_TASK_MANAGERS = 1;
+  private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogExtension =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  // Parallelism = 8 (parallelism > numBuckets) throughout the test suite
+  private final int parallelism = NUMBER_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
+  private final FileFormat format = FileFormat.PARQUET;
+  private final int numBuckets = 4;
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private void setupEnvironment(TableSchemaType tableSchemaType) {
+    table = getTable(tableSchemaType);
+    env =
+        
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism * 2);
+    tableLoader = catalogExtension.tableLoader();
+  }
+
+  private Table getTable(TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
numBuckets);
+
+    return catalogExtension
+        .catalog()
+        .createTable(
+            TABLE_IDENTIFIER,
+            SimpleDataUtil.SCHEMA,
+            partitionSpec,
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+  }
+
+  private List<RowData> convertToRowData(List<Row> rows) {
+    return 
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  private TableTestStats extractTableTestStats(TableSchemaType 
tableSchemaType) throws IOException {
+    int totalRecordCount = 0;
+    Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); // 
<BucketId, List<WriterId>>
+    Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId, 
NumFiles>
+    Map<Integer, Long> recordsPerFile = new TreeMap<>(); // <WriterId, 
NumRecords>
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
+      for (FileScanTask scanTask : fileScanTasks) {
+        long recordCountInFile = scanTask.file().recordCount();
+
+        String[] splitFilePath = scanTask.file().path().toString().split("/");
+        String filename = splitFilePath[splitFilePath.length - 1];
+        int writerId = Integer.parseInt(filename.split("-")[0]);
+
+        totalRecordCount += recordCountInFile;
+        int bucketId =
+            scanTask
+                .file()
+                .partition()
+                .get(tableSchemaType == TableSchemaType.ONE_BUCKET ? 0 : 1, 
Integer.class);

Review Comment:
   > tableSchemaType == TableSchemaType.ONE_BUCKET ? 0 : 1
   
   this code is not intuitive. you can move it inside the enum class. maybe 
expose a method like `bucketPartitionColumnPosition()`



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
+import static 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionerFlinkIcebergSink {
+
+  private static final int NUMBER_TASK_MANAGERS = 1;
+  private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogExtension =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  // Parallelism = 8 (parallelism > numBuckets) throughout the test suite
+  private final int parallelism = NUMBER_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
+  private final FileFormat format = FileFormat.PARQUET;
+  private final int numBuckets = 4;
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private void setupEnvironment(TableSchemaType tableSchemaType) {
+    table = getTable(tableSchemaType);
+    env =
+        
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism * 2);
+    tableLoader = catalogExtension.tableLoader();
+  }
+
+  private Table getTable(TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
numBuckets);
+
+    return catalogExtension
+        .catalog()
+        .createTable(
+            TABLE_IDENTIFIER,
+            SimpleDataUtil.SCHEMA,
+            partitionSpec,
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+  }
+
+  private List<RowData> convertToRowData(List<Row> rows) {
+    return 
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  private TableTestStats extractTableTestStats(TableSchemaType 
tableSchemaType) throws IOException {

Review Comment:
   move the util method after the `@Test` methods



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class HadoopCatalogExtension
+    implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, 
AfterEachCallback {
+  protected final String database;
+  protected final String tableName;
+
+  protected Path temporaryFolder;

Review Comment:
   this can be replaced with `@TempDir private File tempDir;`. then we can 
remove the beforeAll and afterAll callbacks



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of 
range: ";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket
+  private final int[] currentBucketWriterOffset;
+
+  BucketPartitioner(PartitionSpec partitionSpec) {
+    Tuple2<Integer, Integer> bucketFieldInfo =
+        BucketPartitionerUtils.getBucketFieldInfo(partitionSpec);
+
+    this.maxNumBuckets = bucketFieldInfo.f1;
+    this.currentBucketWriterOffset = new int[this.maxNumBuckets];
+  }
+
+  /**
+   * Determine the partition id based on the following criteria: If the number 
of writers <= the
+   * number of buckets, an evenly distributed number of buckets will be 
assigned to each writer (one
+   * writer -> many buckets). Conversely, if the number of writers > the 
number of buckets the logic
+   * is handled by the {@link #getPartitionWritersGreaterThanBuckets
+   * getPartitionWritersGreaterThanBuckets} method.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition id (writer) to use for each request
+   */
+  @Override
+  public int partition(Integer bucketId, int numPartitions) {
+    Preconditions.checkArgument(
+        bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must 
be >= 0)");
+    Preconditions.checkArgument(
+        bucketId < maxNumBuckets,
+        BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX
+            + bucketId
+            + " (must be >= 0), maxNumBuckets: "
+            + maxNumBuckets);
+
+    if (numPartitions <= maxNumBuckets) {
+      return bucketId % numPartitions;
+    } else {
+      return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions);
+    }
+  }
+
+  /*-
+   * If the number of writers > the number of buckets each partitioner will 
keep a state of multiple
+   * writers per bucket as evenly as possible, and will round-robin the 
requests across them, in this
+   * case each writer will target only one bucket at all times (many writers 
-> one bucket). Example:
+   * Configuration: numPartitions (writers) = 5, maxBuckets = 2
+   * Expected behavior:
+   * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4
+   * - Records for Bucket 1 will always use Writer 1 and 3
+   * Notes:
+   * - maxNumWritersPerBucket determines when to reset the 
currentBucketWriterOffset to 0 for this bucketId
+   * - When numPartitions is not evenly divisible by maxBuckets, some buckets 
will have one more writer (extraWriter).
+   * In this example Bucket 0 has an "extra writer" to consider before 
resetting its offset to 0.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition index (writer) to use for each request

Review Comment:
   nit: `@return the destination partition index (writer subtask id)`.
   
   `to use for each request` is a bit inaccurate



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.BucketUtil;
+
+final class TestBucketPartitionerUtils {
+
+  enum TableSchemaType {
+    ONE_BUCKET,
+    IDENTITY_AND_BUCKET,
+    TWO_BUCKETS;
+  }
+
+  private TestBucketPartitionerUtils() {}
+
+  static final DataFormatConverters.RowConverter CONVERTER =
+      new 
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int 
numBuckets) {

Review Comment:
   it is probably bit cleaner to move it into the enum class



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of 
range: ";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket
+  private final int[] currentBucketWriterOffset;
+
+  BucketPartitioner(PartitionSpec partitionSpec) {
+    Tuple2<Integer, Integer> bucketFieldInfo =
+        BucketPartitionerUtils.getBucketFieldInfo(partitionSpec);
+
+    this.maxNumBuckets = bucketFieldInfo.f1;
+    this.currentBucketWriterOffset = new int[this.maxNumBuckets];
+  }
+
+  /**
+   * Determine the partition id based on the following criteria: If the number 
of writers <= the
+   * number of buckets, an evenly distributed number of buckets will be 
assigned to each writer (one
+   * writer -> many buckets). Conversely, if the number of writers > the 
number of buckets the logic
+   * is handled by the {@link #getPartitionWritersGreaterThanBuckets
+   * getPartitionWritersGreaterThanBuckets} method.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition id (writer) to use for each request
+   */
+  @Override
+  public int partition(Integer bucketId, int numPartitions) {
+    Preconditions.checkArgument(
+        bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must 
be >= 0)");
+    Preconditions.checkArgument(
+        bucketId < maxNumBuckets,
+        BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX
+            + bucketId
+            + " (must be >= 0), maxNumBuckets: "
+            + maxNumBuckets);
+
+    if (numPartitions <= maxNumBuckets) {
+      return bucketId % numPartitions;
+    } else {
+      return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions);
+    }
+  }
+
+  /*-
+   * If the number of writers > the number of buckets each partitioner will 
keep a state of multiple
+   * writers per bucket as evenly as possible, and will round-robin the 
requests across them, in this
+   * case each writer will target only one bucket at all times (many writers 
-> one bucket). Example:
+   * Configuration: numPartitions (writers) = 5, maxBuckets = 2
+   * Expected behavior:
+   * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4
+   * - Records for Bucket 1 will always use Writer 1 and 3
+   * Notes:
+   * - maxNumWritersPerBucket determines when to reset the 
currentBucketWriterOffset to 0 for this bucketId
+   * - When numPartitions is not evenly divisible by maxBuckets, some buckets 
will have one more writer (extraWriter).
+   * In this example Bucket 0 has an "extra writer" to consider before 
resetting its offset to 0.
+   *
+   * @param bucketId the bucketId for each request

Review Comment:
   it seems to me that the two `@param` are not useful. maybe remove them.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of 
range: ";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket
+  private final int[] currentBucketWriterOffset;
+
+  BucketPartitioner(PartitionSpec partitionSpec) {
+    Tuple2<Integer, Integer> bucketFieldInfo =
+        BucketPartitionerUtils.getBucketFieldInfo(partitionSpec);
+
+    this.maxNumBuckets = bucketFieldInfo.f1;
+    this.currentBucketWriterOffset = new int[this.maxNumBuckets];
+  }
+
+  /**
+   * Determine the partition id based on the following criteria: If the number 
of writers <= the
+   * number of buckets, an evenly distributed number of buckets will be 
assigned to each writer (one
+   * writer -> many buckets). Conversely, if the number of writers > the 
number of buckets the logic
+   * is handled by the {@link #getPartitionWritersGreaterThanBuckets
+   * getPartitionWritersGreaterThanBuckets} method.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition id (writer) to use for each request
+   */
+  @Override
+  public int partition(Integer bucketId, int numPartitions) {
+    Preconditions.checkArgument(
+        bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must 
be >= 0)");
+    Preconditions.checkArgument(
+        bucketId < maxNumBuckets,
+        BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX
+            + bucketId
+            + " (must be >= 0), maxNumBuckets: "
+            + maxNumBuckets);
+
+    if (numPartitions <= maxNumBuckets) {
+      return bucketId % numPartitions;
+    } else {
+      return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions);
+    }
+  }
+
+  /*-
+   * If the number of writers > the number of buckets each partitioner will 
keep a state of multiple

Review Comment:
   very nice explanation here



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.BucketUtil;
+
+final class TestBucketPartitionerUtils {
+
+  enum TableSchemaType {
+    ONE_BUCKET,
+    IDENTITY_AND_BUCKET,
+    TWO_BUCKETS;
+  }
+
+  private TestBucketPartitionerUtils() {}
+
+  static final DataFormatConverters.RowConverter CONVERTER =
+      new 
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int 
numBuckets) {
+    PartitionSpec partitionSpec = null;
+
+    switch (tableSchemaType) {
+      case ONE_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", 
numBuckets).build();
+        break;
+      case IDENTITY_AND_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .identity("id")
+                .bucket("data", numBuckets)
+                .build();
+        break;
+      case TWO_BUCKETS:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .bucket("id", numBuckets)
+                .bucket("data", numBuckets)
+                .build();
+        break;
+    }
+
+    Preconditions.checkNotNull(
+        partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType);
+    return partitionSpec;
+  }
+
+  /**
+   * Utility method to generate rows whose values will "hash" to a range of 
bucketIds (from 0 to
+   * numBuckets - 1)
+   *
+   * @param numRowsPerBucket how many different rows should be generated per 
bucket
+   * @param numBuckets max number of buckets to consider
+   * @return the list of rows whose data "hashes" to the desired bucketId
+   */
+  static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int 
numBuckets) {

Review Comment:
   it should return `RowData` right? You can use `GenericRowData`. Then we 
don't need `CONVERTER `



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of 
range: ";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket
+  private final int[] currentBucketWriterOffset;
+
+  BucketPartitioner(PartitionSpec partitionSpec) {
+    Tuple2<Integer, Integer> bucketFieldInfo =
+        BucketPartitionerUtils.getBucketFieldInfo(partitionSpec);
+
+    this.maxNumBuckets = bucketFieldInfo.f1;
+    this.currentBucketWriterOffset = new int[this.maxNumBuckets];
+  }
+
+  /**
+   * Determine the partition id based on the following criteria: If the number 
of writers <= the
+   * number of buckets, an evenly distributed number of buckets will be 
assigned to each writer (one
+   * writer -> many buckets). Conversely, if the number of writers > the 
number of buckets the logic
+   * is handled by the {@link #getPartitionWritersGreaterThanBuckets
+   * getPartitionWritersGreaterThanBuckets} method.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition id (writer) to use for each request
+   */
+  @Override
+  public int partition(Integer bucketId, int numPartitions) {
+    Preconditions.checkArgument(
+        bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must 
be >= 0)");
+    Preconditions.checkArgument(
+        bucketId < maxNumBuckets,
+        BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX
+            + bucketId
+            + " (must be >= 0), maxNumBuckets: "
+            + maxNumBuckets);
+
+    if (numPartitions <= maxNumBuckets) {
+      return bucketId % numPartitions;
+    } else {
+      return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions);
+    }
+  }
+
+  /*-
+   * If the number of writers > the number of buckets each partitioner will 
keep a state of multiple
+   * writers per bucket as evenly as possible, and will round-robin the 
requests across them, in this
+   * case each writer will target only one bucket at all times (many writers 
-> one bucket). Example:
+   * Configuration: numPartitions (writers) = 5, maxBuckets = 2
+   * Expected behavior:
+   * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4
+   * - Records for Bucket 1 will always use Writer 1 and 3
+   * Notes:
+   * - maxNumWritersPerBucket determines when to reset the 
currentBucketWriterOffset to 0 for this bucketId
+   * - When numPartitions is not evenly divisible by maxBuckets, some buckets 
will have one more writer (extraWriter).
+   * In this example Bucket 0 has an "extra writer" to consider before 
resetting its offset to 0.
+   *
+   * @param bucketId the bucketId for each request
+   * @param numPartitions the total number of partitions
+   * @return the partition index (writer) to use for each request
+   */
+  private int getPartitionWritersGreaterThanBuckets(int bucketId, int 
numPartitions) {

Review Comment:
   nit: I would call this `partitionWithMoreWritersThanBuckets`



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based 
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether 
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current 
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+  static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of 
range: ";
+
+  private final int maxNumBuckets;
+
+  // To hold the OFFSET of the next writer to use for any bucket

Review Comment:
   nit: add to the comment that this is only used for the case where the number 
of buckets is less than the writer parallelism



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.BucketUtil;
+
+final class TestBucketPartitionerUtils {
+
+  enum TableSchemaType {
+    ONE_BUCKET,
+    IDENTITY_AND_BUCKET,
+    TWO_BUCKETS;
+  }
+
+  private TestBucketPartitionerUtils() {}
+
+  static final DataFormatConverters.RowConverter CONVERTER =
+      new 
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int 
numBuckets) {
+    PartitionSpec partitionSpec = null;
+
+    switch (tableSchemaType) {
+      case ONE_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", 
numBuckets).build();
+        break;
+      case IDENTITY_AND_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .identity("id")
+                .bucket("data", numBuckets)
+                .build();
+        break;
+      case TWO_BUCKETS:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .bucket("id", numBuckets)
+                .bucket("data", numBuckets)
+                .build();
+        break;
+    }
+
+    Preconditions.checkNotNull(
+        partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType);
+    return partitionSpec;
+  }
+
+  /**
+   * Utility method to generate rows whose values will "hash" to a range of 
bucketIds (from 0 to
+   * numBuckets - 1)
+   *
+   * @param numRowsPerBucket how many different rows should be generated per 
bucket
+   * @param numBuckets max number of buckets to consider
+   * @return the list of rows whose data "hashes" to the desired bucketId
+   */
+  static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int 
numBuckets) {
+    List<Row> rows = Lists.newArrayListWithCapacity(numBuckets * 
numRowsPerBucket);
+    // For some of our tests, this order of the generated rows matters
+    for (int i = 0; i < numRowsPerBucket; i++) {
+      for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+        String value = generateValueForBucketId(bucketId, numBuckets);
+        rows.add(Row.of(1, value));
+      }
+    }
+    return rows;
+  }
+
+  /**
+   * Utility method to generate a UUID string that will "hash" to a desired 
bucketId
+   *
+   * @param bucketId the desired bucketId
+   * @return the string data that "hashes" to the desired bucketId
+   */
+  private static String generateValueForBucketId(int bucketId, int numBuckets) 
{
+    String value = "";
+    while (true) {
+      String uuid = UUID.randomUUID().toString();
+      if (computeBucketId(numBuckets, uuid) == bucketId) {
+        value = uuid;
+        break;

Review Comment:
   nit: just return the uuid? avoided the need of value variable.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.BucketUtil;
+
+final class TestBucketPartitionerUtils {
+
+  enum TableSchemaType {
+    ONE_BUCKET,
+    IDENTITY_AND_BUCKET,
+    TWO_BUCKETS;
+  }
+
+  private TestBucketPartitionerUtils() {}
+
+  static final DataFormatConverters.RowConverter CONVERTER =
+      new 
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int 
numBuckets) {
+    PartitionSpec partitionSpec = null;
+
+    switch (tableSchemaType) {
+      case ONE_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", 
numBuckets).build();
+        break;
+      case IDENTITY_AND_BUCKET:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .identity("id")
+                .bucket("data", numBuckets)
+                .build();
+        break;
+      case TWO_BUCKETS:
+        partitionSpec =
+            PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+                .bucket("id", numBuckets)
+                .bucket("data", numBuckets)
+                .build();
+        break;
+    }
+
+    Preconditions.checkNotNull(
+        partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType);
+    return partitionSpec;
+  }
+
+  /**
+   * Utility method to generate rows whose values will "hash" to a range of 
bucketIds (from 0 to
+   * numBuckets - 1)
+   *
+   * @param numRowsPerBucket how many different rows should be generated per 
bucket
+   * @param numBuckets max number of buckets to consider
+   * @return the list of rows whose data "hashes" to the desired bucketId
+   */
+  static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int 
numBuckets) {
+    List<Row> rows = Lists.newArrayListWithCapacity(numBuckets * 
numRowsPerBucket);
+    // For some of our tests, this order of the generated rows matters
+    for (int i = 0; i < numRowsPerBucket; i++) {
+      for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+        String value = generateValueForBucketId(bucketId, numBuckets);
+        rows.add(Row.of(1, value));
+      }
+    }
+    return rows;
+  }
+
+  /**
+   * Utility method to generate a UUID string that will "hash" to a desired 
bucketId
+   *
+   * @param bucketId the desired bucketId
+   * @return the string data that "hashes" to the desired bucketId
+   */
+  private static String generateValueForBucketId(int bucketId, int numBuckets) 
{
+    String value = "";
+    while (true) {
+      String uuid = UUID.randomUUID().toString();
+      if (computeBucketId(numBuckets, uuid) == bucketId) {
+        value = uuid;
+        break;
+      }
+    }
+    return value;

Review Comment:
   nit: Iceberg style adds an empty line after a control block `}`



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; 
expectedIdx++) {

Review Comment:
   move `bucketId` initialization before the for loop. would make it easier to 
see the for loop



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(

Review Comment:
   I feel we need to test two separate scenarios: dividable or non-dividable



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+

Review Comment:
   nit: empty lines in line 40 and 43 seems unnecessary



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; 
expectedIdx++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);

Review Comment:
   unless very obvious, Iceberg style doesn't use acronym. so maybe change 
`actualIdx` to `actualPartitionIndex`. 



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; 
expectedIdx++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(expectedIdx);
+      if (++bucketId == NUM_BUCKETS) {
+        bucketId = 0;
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismEqualLessThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 30;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int bucketId = 0; bucketId < NUM_BUCKETS; bucketId++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(bucketId % numPartitions);
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = TestBucketPartitionerUtils.TableSchemaType.class, names 
= "TWO_BUCKETS")
+  public void testPartitionerMultipleBucketsFail(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> new BucketPartitioner(partitionSpec))
+        
.withMessageContaining(BucketPartitionerUtils.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE);
+  }
+
+  @ParameterizedTest

Review Comment:
   parameterized test seems unnecessary here



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; 
expectedIdx++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(expectedIdx);
+      if (++bucketId == NUM_BUCKETS) {
+        bucketId = 0;
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismEqualLessThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 30;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int bucketId = 0; bucketId < NUM_BUCKETS; bucketId++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(bucketId % numPartitions);
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = TestBucketPartitionerUtils.TableSchemaType.class, names 
= "TWO_BUCKETS")
+  public void testPartitionerMultipleBucketsFail(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    Assertions.assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> new BucketPartitioner(partitionSpec))
+        
.withMessageContaining(BucketPartitionerUtils.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE);
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {-1, NUM_BUCKETS})
+  public void testPartitionerBucketIdOutOfRangeFail(int bucketId) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(
+            TestBucketPartitionerUtils.TableSchemaType.ONE_BUCKET, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> bucketPartitioner.partition(bucketId, 1))
+        .withMessageContaining(BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX);

Review Comment:
   nit: I haven't seen Iceberg code use constant string for unit test error 
message. maybe also assert the exact error msg.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX;
+
+import org.apache.iceberg.PartitionSpec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestBucketPartitioner {
+
+  static final int NUM_BUCKETS = 60;
+
+  @ParameterizedTest
+  @EnumSource(
+      value = TestBucketPartitionerUtils.TableSchemaType.class,
+      names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+  public void testPartitioningParallelismGreaterThanBuckets(
+      TestBucketPartitionerUtils.TableSchemaType tableSchemaType) {
+    final int numPartitions = 500;
+
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
NUM_BUCKETS);
+
+    BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+    for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; 
expectedIdx++) {
+      int actualIdx = bucketPartitioner.partition(bucketId, numPartitions);
+      Assertions.assertThat(actualIdx).isEqualTo(expectedIdx);
+      if (++bucketId == NUM_BUCKETS) {

Review Comment:
   Iceberg style doesn't use `++` with side effect. you can use the ternary `? 
: ` operator here



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
+import static 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionerFlinkIcebergSink {
+
+  private static final int NUMBER_TASK_MANAGERS = 1;
+  private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogExtension =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  // Parallelism = 8 (parallelism > numBuckets) throughout the test suite
+  private final int parallelism = NUMBER_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
+  private final FileFormat format = FileFormat.PARQUET;
+  private final int numBuckets = 4;
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private void setupEnvironment(TableSchemaType tableSchemaType) {
+    table = getTable(tableSchemaType);
+    env =
+        
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism * 2);
+    tableLoader = catalogExtension.tableLoader();
+  }
+
+  private Table getTable(TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
numBuckets);
+
+    return catalogExtension
+        .catalog()
+        .createTable(
+            TABLE_IDENTIFIER,
+            SimpleDataUtil.SCHEMA,
+            partitionSpec,
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+  }
+
+  private List<RowData> convertToRowData(List<Row> rows) {
+    return 
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {

Review Comment:
   this util method seems not necessary. it is better to stay with `RowData`



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
+import static 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionerFlinkIcebergSink {
+
+  private static final int NUMBER_TASK_MANAGERS = 1;
+  private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogExtension =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  // Parallelism = 8 (parallelism > numBuckets) throughout the test suite
+  private final int parallelism = NUMBER_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
+  private final FileFormat format = FileFormat.PARQUET;
+  private final int numBuckets = 4;
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private void setupEnvironment(TableSchemaType tableSchemaType) {
+    table = getTable(tableSchemaType);
+    env =
+        
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism * 2);
+    tableLoader = catalogExtension.tableLoader();
+  }
+
+  private Table getTable(TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
numBuckets);
+
+    return catalogExtension
+        .catalog()
+        .createTable(
+            TABLE_IDENTIFIER,
+            SimpleDataUtil.SCHEMA,
+            partitionSpec,
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+  }
+
+  private List<RowData> convertToRowData(List<Row> rows) {
+    return 
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  private TableTestStats extractTableTestStats(TableSchemaType 
tableSchemaType) throws IOException {
+    int totalRecordCount = 0;
+    Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); // 
<BucketId, List<WriterId>>
+    Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId, 
NumFiles>
+    Map<Integer, Long> recordsPerFile = new TreeMap<>(); // <WriterId, 
NumRecords>
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
+      for (FileScanTask scanTask : fileScanTasks) {
+        long recordCountInFile = scanTask.file().recordCount();
+
+        String[] splitFilePath = scanTask.file().path().toString().split("/");
+        String filename = splitFilePath[splitFilePath.length - 1];
+        int writerId = Integer.parseInt(filename.split("-")[0]);

Review Comment:
    We can also disable checkpoint so that there is only one flush/commit in 
the test.
   
   case 1: number of buckets (like 2) is less than the number of writers (like 
5). with keyBy, there will be two files committed. with bucket partitioner, 
there should be 5 files.
   
   case 2: number of buckets (like 4) is less than the number of writers (like 
2). both keyBy and bucket partition will write 4 files. this is the case where 
we need the extra complexity of paring the writer id. hopefully/likely, the 
keyBy will result in unbalanced assignment. e.g. one writer handles 3 buckets 
while the other handles 1 bucket. that will demonstrate the benefit and 
expected behavior of bucket partitioner



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static 
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
+import static 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import 
org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionerFlinkIcebergSink {
+
+  private static final int NUMBER_TASK_MANAGERS = 1;
+  private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+  @RegisterExtension
+  private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+      new MiniClusterExtension(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+              .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+              .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .build());
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogExtension =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  // Parallelism = 8 (parallelism > numBuckets) throughout the test suite
+  private final int parallelism = NUMBER_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
+  private final FileFormat format = FileFormat.PARQUET;
+  private final int numBuckets = 4;
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private void setupEnvironment(TableSchemaType tableSchemaType) {
+    table = getTable(tableSchemaType);
+    env =
+        
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism * 2);
+    tableLoader = catalogExtension.tableLoader();
+  }
+
+  private Table getTable(TableSchemaType tableSchemaType) {
+    PartitionSpec partitionSpec =
+        TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, 
numBuckets);
+
+    return catalogExtension
+        .catalog()
+        .createTable(
+            TABLE_IDENTIFIER,
+            SimpleDataUtil.SCHEMA,
+            partitionSpec,
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+  }
+
+  private List<RowData> convertToRowData(List<Row> rows) {
+    return 
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  private TableTestStats extractTableTestStats(TableSchemaType 
tableSchemaType) throws IOException {

Review Comment:
   also the method name is not intuitive. it seems like `PartitionResult` to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to