This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2ab11a1f8e6 [HUDI-7804] Improve flink bucket index paritioner (#11346)
2ab11a1f8e6 is described below
commit 2ab11a1f8e6c14d31eccb0b35f29d769dee032cd
Author: Manu <[email protected]>
AuthorDate: Wed May 29 18:45:06 2024 +0800
[HUDI-7804] Improve flink bucket index paritioner (#11346)
---
.../sink/bucket/BucketStreamWriteFunction.java | 14 +++--
.../sink/partitioner/BucketIndexPartitioner.java | 11 +++-
.../apache/hudi/sink/utils/BucketIndexUtil.java | 64 ++++++++++++++++++++++
3 files changed, 82 insertions(+), 7 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 0129396ea52..f32f192062a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.bucket;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.index.bucket.BucketIdentifier;
@@ -30,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
+import org.apache.hudi.sink.utils.BucketIndexUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +75,11 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
*/
private Set<String> incBucketIndex;
+ /**
+ * Functions for calculating the task partition to dispatch.
+ */
+ private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
+
/**
* Constructs a BucketStreamWriteFunction.
*
@@ -92,6 +99,7 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
this.bucketIndex = new HashMap<>();
this.incBucketIndex = new HashSet<>();
+ this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(bucketNum,
parallelism);
}
@Override
@@ -135,12 +143,10 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
/**
* Determine whether the current fileID belongs to the current task.
- * (partition + curBucket) % numPartitions == this taskID belongs to this
task.
+ * partitionIndex == this taskID belongs to this task.
*/
public boolean isBucketToLoad(int bucketNumber, String partition) {
- final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) %
parallelism;
- int globalIndex = partitionIndex + bucketNumber;
- return BucketIdentifier.mod(globalIndex, parallelism) == taskID;
+ return this.partitionIndexFunc.apply(partition, bucketNumber) == taskID;
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index 4e0c08b1046..0ac54e9a4a9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -19,9 +19,11 @@
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.hudi.sink.utils.BucketIndexUtil;
/**
* Bucket index input partitioner.
@@ -34,6 +36,8 @@ public class BucketIndexPartitioner<T extends HoodieKey>
implements Partitioner<
private final int bucketNum;
private final String indexKeyFields;
+ private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
+
public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
this.bucketNum = bucketNum;
this.indexKeyFields = indexKeyFields;
@@ -41,9 +45,10 @@ public class BucketIndexPartitioner<T extends HoodieKey>
implements Partitioner<
@Override
public int partition(HoodieKey key, int numPartitions) {
+ if (this.partitionIndexFunc == null) {
+ this.partitionIndexFunc =
BucketIndexUtil.getPartitionIndexFunc(bucketNum, numPartitions);
+ }
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields,
bucketNum);
- int partitionIndex = (key.getPartitionPath().hashCode() &
Integer.MAX_VALUE) % numPartitions;
- int globalIndex = partitionIndex + curBucket;
- return BucketIdentifier.mod(globalIndex, numPartitions);
+ return this.partitionIndexFunc.apply(key.getPartitionPath(), curBucket);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BucketIndexUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BucketIndexUtil.java
new file mode 100644
index 00000000000..cba7e22c68a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BucketIndexUtil.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+
+/**
+ * Utility class for bucket index.
+ */
+public class BucketIndexUtil {
+
+ /**
+ * This method is used to get the partition index calculation function of a
bucket.
+ * "partition.hashCode() / (parallelism / bucketNum) * bucketNum" divides
the parallelism into
+ * sub-intervals of length bucket_num, different partitions will be mapped
to different sub-interval,
+ * ensure that the data across multiple partitions is evenly distributed.
+ *
+ * @param bucketNum Bucket number per partition
+ * @param parallelism Parallelism of the task
+ * @return The partition index of this bucket.
+ */
+ public static Functions.Function2<String, Integer, Integer>
getPartitionIndexFunc(int bucketNum, int parallelism) {
+ if (parallelism < bucketNum) {
+ return (partition, curBucket) -> {
+ int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) /
parallelism * bucketNum;
+ int globalIndex = partitionIndex + curBucket;
+ return BucketIdentifier.mod(globalIndex, parallelism);
+ };
+ } else {
+ if (parallelism % bucketNum == 0) {
+ return (partition, curBucket) -> {
+ int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) /
(parallelism / bucketNum) * bucketNum;
+ int globalIndex = partitionIndex + curBucket;
+ return BucketIdentifier.mod(globalIndex, parallelism);
+ };
+ } else {
+ return (partition, curBucket) -> {
+ int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) /
(parallelism / bucketNum + 1) * bucketNum;
+ int globalIndex = partitionIndex + curBucket;
+ return BucketIdentifier.mod(globalIndex, parallelism);
+ };
+ }
+ }
+ }
+}