This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ab11a1f8e6 [HUDI-7804] Improve flink bucket index paritioner (#11346)
2ab11a1f8e6 is described below

commit 2ab11a1f8e6c14d31eccb0b35f29d769dee032cd
Author: Manu <[email protected]>
AuthorDate: Wed May 29 18:45:06 2024 +0800

    [HUDI-7804] Improve flink bucket index paritioner (#11346)
---
 .../sink/bucket/BucketStreamWriteFunction.java     | 14 +++--
 .../sink/partitioner/BucketIndexPartitioner.java   | 11 +++-
 .../apache/hudi/sink/utils/BucketIndexUtil.java    | 64 ++++++++++++++++++++++
 3 files changed, 82 insertions(+), 7 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 0129396ea52..f32f192062a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.bucket;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.index.bucket.BucketIdentifier;
@@ -30,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
+import org.apache.hudi.sink.utils.BucketIndexUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +75,11 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
    */
   private Set<String> incBucketIndex;
 
+  /**
+   * Functions for calculating the task partition to dispatch.
+   */
+  private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
+
   /**
    * Constructs a BucketStreamWriteFunction.
    *
@@ -92,6 +99,7 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
     this.bucketIndex = new HashMap<>();
     this.incBucketIndex = new HashSet<>();
+    this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(bucketNum, 
parallelism);
   }
 
   @Override
@@ -135,12 +143,10 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
 
   /**
    * Determine whether the current fileID belongs to the current task.
-   * (partition + curBucket) % numPartitions == this taskID belongs to this 
task.
+   * partitionIndex == this taskID belongs to this task.
    */
   public boolean isBucketToLoad(int bucketNumber, String partition) {
-    final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % 
parallelism;
-    int globalIndex = partitionIndex + bucketNumber;
-    return BucketIdentifier.mod(globalIndex, parallelism)  == taskID;
+    return this.partitionIndexFunc.apply(partition, bucketNumber) == taskID;
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index 4e0c08b1046..0ac54e9a4a9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -19,9 +19,11 @@
 package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.hudi.sink.utils.BucketIndexUtil;
 
 /**
  * Bucket index input partitioner.
@@ -34,6 +36,8 @@ public class BucketIndexPartitioner<T extends HoodieKey> 
implements Partitioner<
   private final int bucketNum;
   private final String indexKeyFields;
 
+  private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
+
   public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
     this.bucketNum = bucketNum;
     this.indexKeyFields = indexKeyFields;
@@ -41,9 +45,10 @@ public class BucketIndexPartitioner<T extends HoodieKey> 
implements Partitioner<
 
   @Override
   public int partition(HoodieKey key, int numPartitions) {
+    if (this.partitionIndexFunc == null) {
+      this.partitionIndexFunc = 
BucketIndexUtil.getPartitionIndexFunc(bucketNum, numPartitions);
+    }
     int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, 
bucketNum);
-    int partitionIndex = (key.getPartitionPath().hashCode() & 
Integer.MAX_VALUE) % numPartitions;
-    int globalIndex = partitionIndex + curBucket;
-    return BucketIdentifier.mod(globalIndex, numPartitions);
+    return this.partitionIndexFunc.apply(key.getPartitionPath(), curBucket);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BucketIndexUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BucketIndexUtil.java
new file mode 100644
index 00000000000..cba7e22c68a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BucketIndexUtil.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+
+/**
+ * Utility class for bucket index.
+ */
+public class BucketIndexUtil {
+
+  /**
+   * This method is used to get the partition index calculation function of a 
bucket.
+   * "partition.hashCode() / (parallelism / bucketNum) * bucketNum" divides 
the parallelism into
+   * sub-intervals of length bucket_num, different partitions will be mapped 
to different sub-interval,
+   * ensure that the data across multiple partitions is evenly distributed.
+   *
+   * @param bucketNum   Bucket number per partition
+   * @param parallelism Parallelism of the task
+   * @return The partition index of this bucket.
+   */
+  public static Functions.Function2<String, Integer, Integer> 
getPartitionIndexFunc(int bucketNum, int parallelism) {
+    if (parallelism < bucketNum) {
+      return (partition, curBucket) -> {
+        int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) / 
parallelism * bucketNum;
+        int globalIndex = partitionIndex + curBucket;
+        return BucketIdentifier.mod(globalIndex, parallelism);
+      };
+    } else {
+      if (parallelism % bucketNum == 0) {
+        return (partition, curBucket) -> {
+          int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) / 
(parallelism / bucketNum) * bucketNum;
+          int globalIndex = partitionIndex + curBucket;
+          return BucketIdentifier.mod(globalIndex, parallelism);
+        };
+      } else {
+        return (partition, curBucket) -> {
+          int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) / 
(parallelism / bucketNum + 1) * bucketNum;
+          int globalIndex = partitionIndex + curBucket;
+          return BucketIdentifier.mod(globalIndex, parallelism);
+        };
+      }
+    }
+  }
+}

Reply via email to