This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 01ab0d9316 Flink: Custom partitioner for bucket partitions (#7161)
01ab0d9316 is described below
commit 01ab0d931644ffd4677e98c0d3ab3919edea0c62
Author: kengtin <[email protected]>
AuthorDate: Wed Aug 9 21:52:51 2023 -0700
Flink: Custom partitioner for bucket partitions (#7161)
---
.../flink/sink/BucketPartitionKeySelector.java | 70 ++++++
.../iceberg/flink/sink/BucketPartitioner.java | 103 +++++++++
.../iceberg/flink/sink/BucketPartitionerUtil.java | 125 +++++++++++
.../org/apache/iceberg/flink/sink/FlinkSink.java | 8 +-
.../iceberg/flink/HadoopCatalogExtension.java | 104 +++++++++
.../flink/sink/TestBucketPartitionKeySelector.java | 65 ++++++
.../iceberg/flink/sink/TestBucketPartitioner.java | 107 +++++++++
.../TestBucketPartitionerFlinkIcebergSink.java | 241 +++++++++++++++++++++
.../flink/sink/TestBucketPartitionerUtil.java | 126 +++++++++++
9 files changed, 948 insertions(+), 1 deletion(-)
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java
new file mode 100644
index 0000000000..1cb6e013bd
--- /dev/null
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.stream.IntStream;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+
+/**
+ * A {@link KeySelector} that extracts the bucketId from a data row's bucket
partition as the key.
+ * To be used with the {@link BucketPartitioner}.
+ */
+class BucketPartitionKeySelector implements KeySelector<RowData, Integer> {
+
+ private final Schema schema;
+ private final PartitionKey partitionKey;
+ private final RowType flinkSchema;
+ private final int bucketFieldPosition;
+
+ private transient RowDataWrapper rowDataWrapper;
+
+ BucketPartitionKeySelector(PartitionSpec partitionSpec, Schema schema,
RowType flinkSchema) {
+ this.schema = schema;
+ this.partitionKey = new PartitionKey(partitionSpec, schema);
+ this.flinkSchema = flinkSchema;
+ this.bucketFieldPosition = getBucketFieldPosition(partitionSpec);
+ }
+
+ private int getBucketFieldPosition(PartitionSpec partitionSpec) {
+ int bucketFieldId = BucketPartitionerUtil.getBucketFieldId(partitionSpec);
+ return IntStream.range(0, partitionSpec.fields().size())
+ .filter(i -> partitionSpec.fields().get(i).fieldId() == bucketFieldId)
+ .toArray()[0];
+ }
+
+ private RowDataWrapper lazyRowDataWrapper() {
+ if (rowDataWrapper == null) {
+ rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ }
+
+ return rowDataWrapper;
+ }
+
+ @Override
+ public Integer getKey(RowData rowData) {
+ partitionKey.partition(lazyRowDataWrapper().wrap(rowData));
+ return partitionKey.get(bucketFieldPosition, Integer.class);
+ }
+}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java
new file mode 100644
index 0000000000..9c9a117906
--- /dev/null
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This partitioner will redirect records to writers deterministically based
on the Bucket partition
+ * spec. It'll attempt to optimize the file size written depending on whether
numPartitions is
+ * greater, less or equal than the maxNumBuckets. Note: The current
implementation only supports ONE
+ * bucket in the partition spec.
+ */
+class BucketPartitioner implements Partitioner<Integer> {
+
+ static final String BUCKET_NULL_MESSAGE = "bucketId cannot be null";
+ static final String BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE =
+ "Invalid bucket ID %s: must be non-negative.";
+ static final String BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE =
+ "Invalid bucket ID %s: must be less than bucket limit: %s.";
+
+ private final int maxNumBuckets;
+
+ // To hold the OFFSET of the next writer to use for any bucket, only used
when writers > the
+ // number of buckets
+ private final int[] currentBucketWriterOffset;
+
+ BucketPartitioner(PartitionSpec partitionSpec) {
+ this.maxNumBuckets = BucketPartitionerUtil.getMaxNumBuckets(partitionSpec);
+ this.currentBucketWriterOffset = new int[maxNumBuckets];
+ }
+
+ /**
+ * Determine the partition id based on the following criteria: If the number
of writers <= the
+ * number of buckets, an evenly distributed number of buckets will be
assigned to each writer (one
+ * writer -> many buckets). Conversely, if the number of writers > the
number of buckets the logic
+ * is handled by the {@link #getPartitionWithMoreWritersThanBuckets
+ * getPartitionWritersGreaterThanBuckets} method.
+ *
+ * @param bucketId the bucketId for each request
+ * @param numPartitions the total number of partitions
+ * @return the partition id (writer) to use for each request
+ */
+ @Override
+ public int partition(Integer bucketId, int numPartitions) {
+ Preconditions.checkNotNull(bucketId, BUCKET_NULL_MESSAGE);
+ Preconditions.checkArgument(bucketId >= 0,
BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, bucketId);
+ Preconditions.checkArgument(
+ bucketId < maxNumBuckets, BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE,
bucketId, maxNumBuckets);
+
+ if (numPartitions <= maxNumBuckets) {
+ return bucketId % numPartitions;
+ } else {
+ return getPartitionWithMoreWritersThanBuckets(bucketId, numPartitions);
+ }
+ }
+
+ /*-
+ * If the number of writers > the number of buckets each partitioner will
keep a state of multiple
+ * writers per bucket as evenly as possible, and will round-robin the
requests across them, in this
+ * case each writer will target only one bucket at all times (many writers
-> one bucket). Example:
+ * Configuration: numPartitions (writers) = 5, maxBuckets = 2
+ * Expected behavior:
+ * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4
+ * - Records for Bucket 1 will always use Writer 1 and 3
+ * Notes:
+ * - maxNumWritersPerBucket determines when to reset the
currentBucketWriterOffset to 0 for this bucketId
+ * - When numPartitions is not evenly divisible by maxBuckets, some buckets
will have one more writer (extraWriter).
+ * In this example Bucket 0 has an "extra writer" to consider before
resetting its offset to 0.
+ *
+ * @return the destination partition index (writer subtask id)
+ */
+ private int getPartitionWithMoreWritersThanBuckets(int bucketId, int
numPartitions) {
+ int currentOffset = currentBucketWriterOffset[bucketId];
+ // Determine if this bucket requires an "extra writer"
+ int extraWriter = bucketId < (numPartitions % maxNumBuckets) ? 1 : 0;
+ // The max number of writers this bucket can have
+ int maxNumWritersPerBucket = (numPartitions / maxNumBuckets) + extraWriter;
+
+ // Increment the writer offset or reset if it's reached the max for this
bucket
+ int nextOffset = currentOffset == maxNumWritersPerBucket - 1 ? 0 :
currentOffset + 1;
+ currentBucketWriterOffset[bucketId] = nextOffset;
+
+ return bucketId + (maxNumBuckets * currentOffset);
+ }
+}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java
new file mode 100644
index 0000000000..c33207728d
--- /dev/null
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+
+final class BucketPartitionerUtil {
+ static final String BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE =
+ "Invalid number of buckets: %s (must be 1)";
+
+ private BucketPartitionerUtil() {}
+
+ /**
+ * Determines whether the PartitionSpec has one and only one Bucket
definition
+ *
+ * @param partitionSpec the partition spec in question
+ * @return whether the PartitionSpec has only one Bucket
+ */
+ static boolean hasOneBucketField(PartitionSpec partitionSpec) {
+ List<Tuple2<Integer, Integer>> bucketFields =
getBucketFields(partitionSpec);
+ return bucketFields != null && bucketFields.size() == 1;
+ }
+
+ /**
+ * Extracts the Bucket definition from a PartitionSpec.
+ *
+ * @param partitionSpec the partition spec in question
+ * @return the Bucket definition in the form of a tuple (fieldId,
maxNumBuckets)
+ */
+ private static Tuple2<Integer, Integer> getBucketFieldInfo(PartitionSpec
partitionSpec) {
+ List<Tuple2<Integer, Integer>> bucketFields =
getBucketFields(partitionSpec);
+ Preconditions.checkArgument(
+ bucketFields.size() == 1,
+ BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE,
+ bucketFields.size());
+ return bucketFields.get(0);
+ }
+
+ static int getBucketFieldId(PartitionSpec partitionSpec) {
+ return getBucketFieldInfo(partitionSpec).f0;
+ }
+
+ static int getMaxNumBuckets(PartitionSpec partitionSpec) {
+ return getBucketFieldInfo(partitionSpec).f1;
+ }
+
+ private static List<Tuple2<Integer, Integer>> getBucketFields(PartitionSpec
spec) {
+ return PartitionSpecVisitor.visit(spec, new
BucketPartitionSpecVisitor()).stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private static class BucketPartitionSpecVisitor
+ implements PartitionSpecVisitor<Tuple2<Integer, Integer>> {
+ @Override
+ public Tuple2<Integer, Integer> identity(int fieldId, String sourceName,
int sourceId) {
+ return null;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> bucket(
+ int fieldId, String sourceName, int sourceId, int numBuckets) {
+ return new Tuple2<>(fieldId, numBuckets);
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> truncate(
+ int fieldId, String sourceName, int sourceId, int width) {
+ return null;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> year(int fieldId, String sourceName, int
sourceId) {
+ return null;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> month(int fieldId, String sourceName, int
sourceId) {
+ return null;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> day(int fieldId, String sourceName, int
sourceId) {
+ return null;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> hour(int fieldId, String sourceName, int
sourceId) {
+ return null;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> alwaysNull(int fieldId, String sourceName,
int sourceId) {
+ return null;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> unknown(
+ int fieldId, String sourceName, int sourceId, String transform) {
+ return null;
+ }
+ }
+}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 844efd6ca6..0237027901 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -509,7 +509,13 @@ public class FlinkSink {
+ "and table is unpartitioned");
return input;
} else {
- return input.keyBy(new PartitionKeySelector(partitionSpec,
iSchema, flinkRowType));
+ if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) {
+ return input.partitionCustom(
+ new BucketPartitioner(partitionSpec),
+ new BucketPartitionKeySelector(partitionSpec, iSchema,
flinkRowType));
+ } else {
+ return input.keyBy(new PartitionKeySelector(partitionSpec,
iSchema, flinkRowType));
+ }
}
} else {
if (partitionSpec.isUnpartitioned()) {
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java
new file mode 100644
index 0000000000..d8e1325254
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class HadoopCatalogExtension
+ implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback,
AfterEachCallback {
+ protected final String database;
+ protected final String tableName;
+
+ protected Path temporaryFolder;
+ protected Catalog catalog;
+ protected CatalogLoader catalogLoader;
+ protected String warehouse;
+ protected TableLoader tableLoader;
+
+ public HadoopCatalogExtension(String database, String tableName) {
+ this.database = database;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ this.temporaryFolder = Files.createTempDirectory("junit5_hadoop_catalog-");
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ FileUtils.deleteDirectory(temporaryFolder.toFile());
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ Assertions.assertThat(temporaryFolder).exists().isDirectory();
+ this.warehouse = "file:" + temporaryFolder + "/" + UUID.randomUUID();
+ this.catalogLoader =
+ CatalogLoader.hadoop(
+ "hadoop",
+ new Configuration(),
+ ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse));
+ this.catalog = catalogLoader.loadCatalog();
+ this.tableLoader =
+ TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(database,
tableName));
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ try {
+ catalog.dropTable(TableIdentifier.of(database, tableName));
+ ((HadoopCatalog) catalog).close();
+ tableLoader.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to close catalog resource");
+ }
+ }
+
+ public TableLoader tableLoader() {
+ return tableLoader;
+ }
+
+ public Catalog catalog() {
+ return catalog;
+ }
+
+ public CatalogLoader catalogLoader() {
+ return catalogLoader;
+ }
+
+ public String warehouse() {
+ return warehouse;
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java
new file mode 100644
index 0000000000..5ebcc6361c
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionKeySelector {
+
+ @ParameterizedTest
+ @EnumSource(
+ value = TableSchemaType.class,
+ names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+ public void testCorrectKeySelection(TableSchemaType tableSchemaType) {
+ int numBuckets = 60;
+
+ PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+ BucketPartitionKeySelector keySelector =
+ new BucketPartitionKeySelector(
+ partitionSpec, SimpleDataUtil.SCHEMA, SimpleDataUtil.ROW_TYPE);
+
+ TestBucketPartitionerUtil.generateRowsForBucketIdRange(2, numBuckets)
+ .forEach(
+ rowData -> {
+ int expectedBucketId =
+ TestBucketPartitionerUtil.computeBucketId(
+ numBuckets, rowData.getString(1).toString());
+ Integer key = keySelector.getKey(rowData);
+ Assertions.assertThat(key).isEqualTo(expectedBucketId);
+ });
+ }
+
+ @Test
+ public void testKeySelectorMultipleBucketsFail() {
+ PartitionSpec partitionSpec =
TableSchemaType.TWO_BUCKETS.getPartitionSpec(1);
+
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ new BucketPartitionKeySelector(
+ partitionSpec, SimpleDataUtil.SCHEMA,
SimpleDataUtil.ROW_TYPE))
+
.withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2);
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java
new file mode 100644
index 0000000000..835713e6b4
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE;
+import static
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE;
+import static
org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_NULL_MESSAGE;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+public class TestBucketPartitioner {
+
+ static final int DEFAULT_NUM_BUCKETS = 60;
+
+ @ParameterizedTest
+ @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60",
"IDENTITY_AND_BUCKET,60"})
+ public void testPartitioningParallelismGreaterThanBuckets(
+ String schemaTypeStr, String numBucketsStr) {
+ int numPartitions = 500;
+ TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr);
+ int numBuckets = Integer.parseInt(numBucketsStr);
+ PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+ BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+ int bucketId = 0;
+ for (int expectedIndex = 0; expectedIndex < numPartitions;
expectedIndex++) {
+ int actualPartitionIndex = bucketPartitioner.partition(bucketId,
numPartitions);
+ Assertions.assertThat(actualPartitionIndex).isEqualTo(expectedIndex);
+ bucketId++;
+ if (bucketId == numBuckets) {
+ bucketId = 0;
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60",
"IDENTITY_AND_BUCKET,60"})
+ public void testPartitioningParallelismEqualLessThanBuckets(
+ String schemaTypeStr, String numBucketsStr) {
+ int numPartitions = 30;
+ TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr);
+ int numBuckets = Integer.parseInt(numBucketsStr);
+ PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+ BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+ for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+ int actualPartitionIndex = bucketPartitioner.partition(bucketId,
numPartitions);
+ Assertions.assertThat(actualPartitionIndex).isEqualTo(bucketId %
numPartitions);
+ }
+ }
+
+ @Test
+ public void testPartitionerBucketIdNullFail() {
+ PartitionSpec partitionSpec =
TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS);
+ BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(() -> bucketPartitioner.partition(null,
DEFAULT_NUM_BUCKETS))
+ .withMessage(BUCKET_NULL_MESSAGE);
+ }
+
+ @Test
+ public void testPartitionerMultipleBucketsFail() {
+ PartitionSpec partitionSpec =
TableSchemaType.TWO_BUCKETS.getPartitionSpec(DEFAULT_NUM_BUCKETS);
+
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(() -> new BucketPartitioner(partitionSpec))
+
.withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2);
+ }
+
+ @Test
+ public void testPartitionerBucketIdOutOfRangeFail() {
+ PartitionSpec partitionSpec =
TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS);
+ BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec);
+
+ int negativeBucketId = -1;
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> bucketPartitioner.partition(negativeBucketId, 1))
+ .withMessage(BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, negativeBucketId);
+
+ int tooBigBucketId = DEFAULT_NUM_BUCKETS;
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> bucketPartitioner.partition(tooBigBucketId, 1))
+ .withMessage(BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE, tooBigBucketId,
DEFAULT_NUM_BUCKETS);
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
new file mode 100644
index 0000000000..29a0898a1b
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class TestBucketPartitionerFlinkIcebergSink {
+
+ private static final int NUMBER_TASK_MANAGERS = 1;
+ private static final int SLOTS_PER_TASK_MANAGER = 8;
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+ .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .build());
+
+ @RegisterExtension
+ private static final HadoopCatalogExtension catalogExtension =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+ private static final TypeInformation<Row> ROW_TYPE_INFO =
+ new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+ // Parallelism = 8 (parallelism > numBuckets) throughout the test suite
+ private final int parallelism = NUMBER_TASK_MANAGERS *
SLOTS_PER_TASK_MANAGER;
+ private final FileFormat format = FileFormat.PARQUET;
+ private final int numBuckets = 4;
+
+ private Table table;
+ private StreamExecutionEnvironment env;
+ private TableLoader tableLoader;
+
+ private void setupEnvironment(TableSchemaType tableSchemaType) {
+ PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
+ table =
+ catalogExtension
+ .catalog()
+ .createTable(
+ TABLE_IDENTIFIER,
+ SimpleDataUtil.SCHEMA,
+ partitionSpec,
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
format.name()));
+ env =
+
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .enableCheckpointing(100)
+ .setParallelism(parallelism)
+ .setMaxParallelism(parallelism * 2);
+ tableLoader = catalogExtension.tableLoader();
+ }
+
+ private void appendRowsToTable(List<RowData> allRows) throws Exception {
+ DataFormatConverters.RowConverter converter =
+ new
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+ DataStream<RowData> dataStream =
+ env.addSource(
+ new BoundedTestSource<>(
+
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
+ ROW_TYPE_INFO)
+ .map(converter::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ FlinkSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .distributionMode(DistributionMode.HASH)
+ .append();
+
+ env.execute("Test Iceberg DataStream");
+
+ SimpleDataUtil.assertTableRows(table, allRows);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = TableSchemaType.class,
+ names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
+ public void testSendRecordsToAllBucketsEvenly(TableSchemaType
tableSchemaType) throws Exception {
+ setupEnvironment(tableSchemaType);
+ List<RowData> rows = generateTestDataRows();
+
+ appendRowsToTable(rows);
+ TableTestStats stats = extractPartitionResults(tableSchemaType);
+
+ Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size());
+ // All 4 buckets should've been written to
+ Assertions.assertThat(stats.writersPerBucket.size()).isEqualTo(numBuckets);
+
Assertions.assertThat(stats.numFilesPerBucket.size()).isEqualTo(numBuckets);
+ // Writer expectation (2 writers per bucket):
+ // - Bucket0 -> Writers [0, 4]
+ // - Bucket1 -> Writers [1, 5]
+ // - Bucket2 -> Writers [2, 6]
+ // - Bucket3 -> Writers [3, 7]
+ for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) {
+
Assertions.assertThat(stats.writersPerBucket.get(i)).hasSameElementsAs(Arrays.asList(i,
j));
+ // 2 files per bucket (one file is created by each writer)
+ Assertions.assertThat(stats.numFilesPerBucket.get(i)).isEqualTo(2);
+ // 2 rows per file (total of 16 rows across 8 files)
+ Assertions.assertThat(stats.rowsPerWriter.get(i)).isEqualTo(2);
+ }
+ }
+
+ /**
+ * Verifies the BucketPartitioner is not used when the PartitionSpec has
more than 1 bucket, and
+ * that it should fallback to input.keyBy
+ */
+ @ParameterizedTest
+ @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS")
+ public void testMultipleBucketsFallback(TableSchemaType tableSchemaType)
throws Exception {
+ setupEnvironment(tableSchemaType);
+ List<RowData> rows = generateTestDataRows();
+
+ appendRowsToTable(rows);
+ TableTestStats stats = extractPartitionResults(tableSchemaType);
+
+ Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size());
+ for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) {
+ // Only 1 file per bucket will be created when falling back to
input.keyBy(...)
+ Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1);
+ }
+ }
+
+ /**
+ * Generating 16 rows to be sent uniformly to all writers (round-robin
across 8 writers -> 4
+ * buckets)
+ */
+ private List<RowData> generateTestDataRows() {
+ int totalNumRows = parallelism * 2;
+ int numRowsPerBucket = totalNumRows / numBuckets;
+ return
TestBucketPartitionerUtil.generateRowsForBucketIdRange(numRowsPerBucket,
numBuckets);
+ }
+
+ private TableTestStats extractPartitionResults(TableSchemaType
tableSchemaType)
+ throws IOException {
+ int totalRecordCount = 0;
+ Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); //
<BucketId, List<WriterId>>
+ Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId,
NumFiles>
+ Map<Integer, Long> rowsPerWriter = Maps.newHashMap(); // <WriterId,
NumRecords>
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
+ for (FileScanTask scanTask : fileScanTasks) {
+ long recordCountInFile = scanTask.file().recordCount();
+
+ String[] splitFilePath = scanTask.file().path().toString().split("/");
+ // Filename example:
00007-0-a7d3a29a-33e9-4740-88f4-0f494397d60c-00001.parquet
+ // Writer ID: .......^^^^^
+ String filename = splitFilePath[splitFilePath.length - 1];
+ int writerId = Integer.parseInt(filename.split("-")[0]);
+
+ totalRecordCount += recordCountInFile;
+ int bucketId =
+ scanTask
+ .file()
+ .partition()
+ .get(tableSchemaType.bucketPartitionColumnPosition(),
Integer.class);
+ writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList());
+ writersPerBucket.get(bucketId).add(writerId);
+ filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0)
+ 1);
+ rowsPerWriter.put(writerId, rowsPerWriter.getOrDefault(writerId, 0L) +
recordCountInFile);
+ }
+ }
+
+ return new TableTestStats(totalRecordCount, writersPerBucket,
filesPerBucket, rowsPerWriter);
+ }
+
+ /** DTO to hold Test Stats */
+ private static class TableTestStats {
+ final int totalRowCount;
+ final Map<Integer, List<Integer>> writersPerBucket;
+ final Map<Integer, Integer> numFilesPerBucket;
+ final Map<Integer, Long> rowsPerWriter;
+
+ TableTestStats(
+ int totalRecordCount,
+ Map<Integer, List<Integer>> writersPerBucket,
+ Map<Integer, Integer> numFilesPerBucket,
+ Map<Integer, Long> rowsPerWriter) {
+ this.totalRowCount = totalRecordCount;
+ this.writersPerBucket = writersPerBucket;
+ this.numFilesPerBucket = numFilesPerBucket;
+ this.rowsPerWriter = rowsPerWriter;
+ }
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java
new file mode 100644
index 0000000000..e1309bfac6
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.BucketUtil;
+
+final class TestBucketPartitionerUtil {
+
+ enum TableSchemaType {
+ ONE_BUCKET {
+ @Override
+ public int bucketPartitionColumnPosition() {
+ return 0;
+ }
+
+ @Override
+ public PartitionSpec getPartitionSpec(int numBuckets) {
+ return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data",
numBuckets).build();
+ }
+ },
+ IDENTITY_AND_BUCKET {
+ @Override
+ public int bucketPartitionColumnPosition() {
+ return 1;
+ }
+
+ @Override
+ public PartitionSpec getPartitionSpec(int numBuckets) {
+ return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+ .identity("id")
+ .bucket("data", numBuckets)
+ .build();
+ }
+ },
+ TWO_BUCKETS {
+ @Override
+ public int bucketPartitionColumnPosition() {
+ return 1;
+ }
+
+ @Override
+ public PartitionSpec getPartitionSpec(int numBuckets) {
+ return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA)
+ .bucket("id", numBuckets)
+ .bucket("data", numBuckets)
+ .build();
+ }
+ };
+
+ public abstract int bucketPartitionColumnPosition();
+
+ public abstract PartitionSpec getPartitionSpec(int numBuckets);
+ }
+
+ private TestBucketPartitionerUtil() {}
+
+ /**
+ * Utility method to generate rows whose values will "hash" to a range of
bucketIds (from 0 to
+ * numBuckets - 1)
+ *
+ * @param numRowsPerBucket how many different rows should be generated per
bucket
+ * @param numBuckets max number of buckets to consider
+ * @return the list of rows whose data "hashes" to the desired bucketId
+ */
+ static List<RowData> generateRowsForBucketIdRange(int numRowsPerBucket, int
numBuckets) {
+ List<RowData> rows = Lists.newArrayListWithCapacity(numBuckets *
numRowsPerBucket);
+ // For some of our tests, this order of the generated rows matters
+ for (int i = 0; i < numRowsPerBucket; i++) {
+ for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+ String value = generateValueForBucketId(bucketId, numBuckets);
+ rows.add(GenericRowData.of(1, StringData.fromString(value)));
+ }
+ }
+ return rows;
+ }
+
+ /**
+ * Utility method to generate a UUID string that will "hash" to a desired
bucketId
+ *
+ * @param bucketId the desired bucketId
+ * @return the string data that "hashes" to the desired bucketId
+ */
+ private static String generateValueForBucketId(int bucketId, int numBuckets)
{
+ while (true) {
+ String uuid = UUID.randomUUID().toString();
+ if (computeBucketId(numBuckets, uuid) == bucketId) {
+ return uuid;
+ }
+ }
+ }
+
+ /**
+ * Utility that performs the same hashing/bucketing mechanism used by
Bucket.java
+ *
+ * @param numBuckets max number of buckets to consider
+ * @param value the string to compute the bucketId from
+ * @return the computed bucketId
+ */
+ static int computeBucketId(int numBuckets, String value) {
+ return (BucketUtil.hash(value) & Integer.MAX_VALUE) % numBuckets;
+ }
+}