This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 16ee6bc4e32 [HUDI-7977] Improve bucket index partitioner algorithm 
(#11608)
16ee6bc4e32 is described below

commit 16ee6bc4e329e6ecd7887039cb5216aecf571e8c
Author: KnightChess <[email protected]>
AuthorDate: Thu Jul 11 08:11:48 2024 +0800

    [HUDI-7977] Improve bucket index partitioner algorithm (#11608)
---
 .../hudi/common/util/hash/BucketIndexUtil.java     |  26 +---
 .../hudi/common/util/hash/TestBucketIndexUtil.java | 152 +++++++++++++++++++++
 2 files changed, 157 insertions(+), 21 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/BucketIndexUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/BucketIndexUtil.java
index adfdd4540d8..ea3f6a2a12c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/BucketIndexUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/BucketIndexUtil.java
@@ -36,26 +36,10 @@ public class BucketIndexUtil {
    * @return The partition index of this bucket.
    */
   public static Functions.Function2<String, Integer, Integer> 
getPartitionIndexFunc(int bucketNum, int parallelism) {
-    if (parallelism < bucketNum) {
-      return (partition, curBucket) -> {
-        int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) / 
parallelism * bucketNum;
-        int globalIndex = partitionIndex + curBucket;
-        return globalIndex % parallelism;
-      };
-    } else {
-      if (parallelism % bucketNum == 0) {
-        return (partition, curBucket) -> {
-          int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) / 
(parallelism / bucketNum) * bucketNum;
-          int globalIndex = partitionIndex + curBucket;
-          return globalIndex % parallelism;
-        };
-      } else {
-        return (partition, curBucket) -> {
-          int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) / 
(parallelism / bucketNum + 1) * bucketNum;
-          int globalIndex = partitionIndex + curBucket;
-          return globalIndex % parallelism;
-        };
-      }
-    }
+    return (partition, curBucket) -> {
+      int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % 
parallelism * bucketNum;
+      int globalIndex = partitionIndex + curBucket;
+      return globalIndex % parallelism;
+    };
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestBucketIndexUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestBucketIndexUtil.java
new file mode 100644
index 00000000000..91c0da003f4
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestBucketIndexUtil.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util.hash;
+
+import org.apache.hudi.common.util.Functions;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestBucketIndexUtil {
+
+  private static Stream<Arguments> partitionParams() {
+    List<Arguments> argsList = new ArrayList<>();
+    argsList.add(Arguments.of(10, 5, true));
+    argsList.add(Arguments.of(20, 5, true));
+    argsList.add(Arguments.of(21, 5, true));
+    argsList.add(Arguments.of(40, 5, true));
+    argsList.add(Arguments.of(41, 5, true));
+    argsList.add(Arguments.of(100, 5, true));
+    argsList.add(Arguments.of(101, 5, true));
+    argsList.add(Arguments.of(20, 100, true));
+    argsList.add(Arguments.of(21, 100, true));
+    argsList.add(Arguments.of(100, 100, true));
+    argsList.add(Arguments.of(101, 100, true));
+    argsList.add(Arguments.of(200, 100, true));
+    argsList.add(Arguments.of(201, 100, true));
+    argsList.add(Arguments.of(400, 1000, true));
+    argsList.add(Arguments.of(401, 1000, true));
+
+    return argsList.stream();
+  }
+
+  private static Stream<Arguments> noPartitionParams() {
+    List<Arguments> argsList = new ArrayList<>();
+
+    argsList.add(Arguments.of(10, 50, false));
+    argsList.add(Arguments.of(11, 50, false));
+    argsList.add(Arguments.of(100, 50, false));
+    argsList.add(Arguments.of(101, 50, false));
+
+    return argsList.stream();
+  }
+
+  @ParameterizedTest
+  @MethodSource("partitionParams")
+  void testPartition(int parallelism, int bucketNumber, boolean partitioned) {
+    Map<Integer, Integer> parallelism2TaskCount = new HashMap<>();
+    final Functions.Function2<String, Integer, Integer> partitionIndexFunc =
+        BucketIndexUtil.getPartitionIndexFunc(bucketNumber, parallelism);
+    initPartitionData(parallelism2TaskCount, bucketNumber, partitionIndexFunc);
+    checkResult(parallelism2TaskCount, parallelism, bucketNumber, partitioned);
+  }
+
+  @ParameterizedTest
+  @MethodSource("noPartitionParams")
+  void testNoPartition(int parallelism, int bucketNumber, boolean partitioned) 
{
+    Map<Integer, Integer> parallelism2TaskCount = new HashMap<>();
+    final Functions.Function2<String, Integer, Integer> partitionIndexFunc =
+        BucketIndexUtil.getPartitionIndexFunc(bucketNumber, parallelism);
+    initNoPartitionData(parallelism2TaskCount, bucketNumber, 
partitionIndexFunc);
+    checkResult(parallelism2TaskCount, parallelism, bucketNumber, partitioned);
+  }
+
+  private static void putIndexCount(Map<Integer, Integer> 
parallelism2TaskCount, int workIndex) {
+    if (parallelism2TaskCount.containsKey(workIndex)) {
+      parallelism2TaskCount.put(workIndex, 
parallelism2TaskCount.get(workIndex) + 1);
+    } else {
+      parallelism2TaskCount.put(workIndex, 1);
+    }
+  }
+
+  private void checkResult(Map<Integer, Integer> parallelism2TaskCount, int 
parallelism, int bucketNumber, boolean partitioned) {
+    int sum = 0;
+    for (int v : parallelism2TaskCount.values()) {
+      sum = sum + v;
+    }
+    int avg = sum / parallelism;
+    double minToleranceValue = avg * 0.8;
+    double maxToleranceValue = avg * 1.2;
+    final ArrayList<Integer> outOfLimit = new ArrayList<>();
+    final ArrayList<Integer> inLimit = new ArrayList<>();
+    for (int v : parallelism2TaskCount.values()) {
+      // if parallelism is too bigger, first condition will false although the 
diff is 1
+      // for example, avg is 4, v is 5 or 3 all will out of limit, so add 
second condition
+      if ((v >= minToleranceValue) && (v <= maxToleranceValue) || ((Math.abs(v 
- avg) <= 2))) {
+        inLimit.add(v);
+      } else {
+        outOfLimit.add(v);
+      }
+    }
+    assertEquals(0, outOfLimit.size());
+
+    // parallelism2TaskCount indicates the number of data split assigned each 
parallelism,
+    // so it's size need infinitely close parallelism or (bucketNumber * 8), 
try to take advantage of available resources as much as possible
+    int totalBucketFileNumber = bucketNumber;
+    if (partitioned) {
+      totalBucketFileNumber = bucketNumber * 8;
+    }
+    if (parallelism >= totalBucketFileNumber) {
+      assertTrue(parallelism2TaskCount.size() >= (totalBucketFileNumber * 
0.9));
+    } else {
+      assertTrue(parallelism2TaskCount.size() >= parallelism * 0.9);
+    }
+  }
+
+  private void initPartitionData(Map<Integer, Integer> parallelism2TaskCount, 
int bucketNumber,
+                                 Functions.Function2<String, Integer, Integer> 
partitionIndexFunc) {
+    parallelism2TaskCount.clear();
+    Arrays.asList("year=2021/month=01/day=01", "year=2021/month=01/day=02", 
"year=2021/month=01/day=03", "year=2021/month=01/day=04",
+        "year=2021/month=01/day=05", "year=2021/month=01/day=06", 
"year=2021/month=01/day=07", "year=2021/month=01/day=08").forEach(partition -> {
+          for (int bucketIndex = 0; bucketIndex < bucketNumber; bucketIndex++) 
{
+            putIndexCount(parallelism2TaskCount, 
partitionIndexFunc.apply(partition, bucketIndex));
+          }
+        });
+  }
+
+  private void initNoPartitionData(Map<Integer, Integer> 
parallelism2TaskCount, int bucketNumber,
+                                   Functions.Function2<String, Integer, 
Integer> partitionIndexFunc) {
+    parallelism2TaskCount.clear();
+    for (int bucketIndex = 0; bucketIndex < bucketNumber; bucketIndex++) {
+      putIndexCount(parallelism2TaskCount, partitionIndexFunc.apply("", 
bucketIndex));
+    }
+  }
+}

Reply via email to