This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 34ff039da5a [HUDI-9399] Improve bloom filter bucketizing in Spark 
(#13284)
34ff039da5a is described below

commit 34ff039da5a9605335016aec0ab8094093de7f46
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri May 9 21:47:17 2025 -0700

    [HUDI-9399] Improve bloom filter bucketizing in Spark (#13284)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  14 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../bloom/BucketizedBloomCheckPartitioner.java     |  34 ++++-
 .../index/bloom/SparkHoodieBloomIndexHelper.java   |   6 +-
 .../bloom/TestBucketizedBloomCheckPartitioner.java | 170 ++++++++++++++++++---
 5 files changed, 196 insertions(+), 32 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 6458c02daaa..9aac9cf3534 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -148,6 +148,20 @@ public class HoodieIndexConfig extends HoodieConfig {
           + "When true, bucketized bloom filtering is enabled. "
           + "This reduces skew seen in sort based bloom index lookup");
 
+  public static final ConfigProperty<String> 
BLOOM_INDEX_BUCKETIZED_CHECKING_ENABLE_DYNAMIC_PARALLELISM = ConfigProperty
+      .key("hoodie.bloom.index.bucketized.checking.enable.dynamic.parallelism")
+      .defaultValue("false")
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Only applies if index type is BLOOM and the 
bucketized bloom filtering "
+          + "is enabled. When true, the index parallelism is determined by the 
number of file "
+          + "groups to look up and the number of keys per bucket to split 
comparisons within a "
+          + "file group; otherwise, the index parallelism is limited by the 
input parallelism. "
+          + "PLEASE NOTE that if the bloom index parallelism (" + 
BLOOM_INDEX_PARALLELISM.key()
+          + ") is configured, the bloom index parallelism takes effect instead 
of the input "
+          + "parallelism and always limits the number of buckets calculated 
based on the number "
+          + "of keys per bucket in the bucketized bloom filtering.");
+
   public static final ConfigProperty<String> 
BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING = ConfigProperty
       .key("hoodie.bloom.index.fileid.key.sorting.enable")
       .defaultValue("false")
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 812d083a70e..fc38c85adb6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2123,6 +2123,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean useBloomIndexBucketizedCheckingWithDynamicParallelism() {
+    return 
getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING_ENABLE_DYNAMIC_PARALLELISM);
+  }
+
   public boolean isBloomIndexFileGroupIdKeySortingEnabled() {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
index dad51f42d01..b367b2f8562 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
@@ -70,20 +70,40 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
   /**
    * Create a partitioner that computes a plan based on provided workload 
characteristics.
    *
-   * @param targetPartitions maximum number of partitions to target
-   * @param fileGroupToComparisons number of expected comparisons per file 
group
-   * @param keysPerBucket maximum number of keys to pack in a single bucket
+   * @param configuredBloomIndexParallelism configured bloom index parallelism;
+   *                                        0 means not configured by the user
+   * @param inputParallelism                input parallelism
+   * @param fileGroupToComparisons          number of expected comparisons per 
file group
+   * @param keysPerBucket                   maximum number of keys to pack in 
a single bucket
+   * @param shouldUseDynamicParallelism     whether the parallelism should be 
determined
+   *                                        by the keys per bucket
    */
-  public BucketizedBloomCheckPartitioner(int targetPartitions, 
Map<HoodieFileGroupId, Long> fileGroupToComparisons,
-      int keysPerBucket) {
+  public BucketizedBloomCheckPartitioner(
+      int configuredBloomIndexParallelism,
+      int inputParallelism,
+      Map<HoodieFileGroupId, Long> fileGroupToComparisons,
+      int keysPerBucket,
+      boolean shouldUseDynamicParallelism) {
     this.fileGroupToPartitions = new HashMap<>();
 
     Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new HashMap<>();
     // Compute the buckets needed per file group, using simple uniform 
distribution
     fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int) 
Math.ceil((c * 1.0) / keysPerBucket)));
     int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> 
i).sum();
-    // If totalBuckets > targetPartitions, no need to have extra partitions
-    this.partitions = Math.min(targetPartitions, totalBuckets);
+
+    if (configuredBloomIndexParallelism > 0) {
+      // If bloom index parallelism is configured, the number of buckets is
+      // limited by the configured bloom index parallelism
+      this.partitions = Math.min(configuredBloomIndexParallelism, 
totalBuckets);
+    } else if (shouldUseDynamicParallelism) {
+      // If bloom index parallelism is not configured, and dynamic buckets are 
enabled,
+      // honor the number of buckets calculated based on the keys per bucket
+      this.partitions = totalBuckets;
+    } else {
+      // If bloom index parallelism is not configured, and dynamic buckets are 
disabled,
+      // honor the input parallelism as the max number of buckets to use
+      this.partitions = Math.min(inputParallelism, totalBuckets);
+    }
 
     // PHASE 1 : start filling upto minimum number of buckets into partitions, 
taking all but one bucket from each file
     // This tries to first optimize for goal 1 above, with knowledge that each 
partition needs a certain minimum number
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 07133edcc49..8376c932f4d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -163,8 +163,10 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
     } else if (config.useBloomIndexBucketizedChecking()) {
       Map<HoodieFileGroupId, Long> comparisonsPerFileGroup = 
computeComparisonsPerFileGroup(
           config, recordsPerPartition, partitionToFileInfo, 
fileComparisonsRDD, context);
-      Partitioner partitioner = new 
BucketizedBloomCheckPartitioner(targetParallelism, comparisonsPerFileGroup,
-          config.getBloomIndexKeysPerBucket());
+      Partitioner partitioner = new BucketizedBloomCheckPartitioner(
+          configuredBloomIndexParallelism, inputParallelism, 
comparisonsPerFileGroup,
+          config.getBloomIndexKeysPerBucket(),
+          config.useBloomIndexBucketizedCheckingWithDynamicParallelism());
 
       keyLookupResultRDD = fileComparisonsRDD.mapToPair(fileGroupAndRecordKey 
-> new Tuple2<>(fileGroupAndRecordKey, false))
           .repartitionAndSortWithinPartitions(partitioner, new 
FileGroupIdComparator())
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
index 69af259e603..e2929704514 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
@@ -22,12 +22,17 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import scala.Tuple2;
 
@@ -36,28 +41,107 @@ import static 
org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestBucketizedBloomCheckPartitioner {
+  private static Stream<Arguments> partitioningTestCases() {
+    // Case 1
+    Map<HoodieFileGroupId, Long> fileToComparisons = 
constructFileToComparisons(
+        Pair.of(new HoodieFileGroupId("p1", "f1"), 30L),
+        Pair.of(new HoodieFileGroupId("p1", "f2"), 35L),
+        Pair.of(new HoodieFileGroupId("p1", "f3"), 20L)
+    );
+    // partitioning based on parallelism of 4
+    Map<HoodieFileGroupId, List<Integer>> partitioning1 = 
constructPartitioning(
+        Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 0, 3}),
+        Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2, 2, 3, 1}),
+        Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1, 0})
+    );
+    List<LookUpKeyAndResult> lookUpKeyAndResults1 = 
constructLookUpKeyAndResults(
+        LookUpKeyAndResult.of("p1", "f1", "k1", 3),
+        LookUpKeyAndResult.of("p1", "f1", "k2", 0),
+        LookUpKeyAndResult.of("p1", "f2", "k4", 2),
+        LookUpKeyAndResult.of("p1", "f3", "k7", 1));
+    // partitioning based on keys per bucket
+    Map<HoodieFileGroupId, List<Integer>> partitioning2 = 
constructPartitioning(
+        Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 1, 6}),
+        Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {3, 4, 5, 8}),
+        Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {2, 7})
+    );
+    List<LookUpKeyAndResult> lookUpKeyAndResults2 = 
constructLookUpKeyAndResults(
+        LookUpKeyAndResult.of("p1", "f1", "k1", 6),
+        LookUpKeyAndResult.of("p1", "f1", "k2", 0),
+        LookUpKeyAndResult.of("p1", "f2", "k4", 4),
+        LookUpKeyAndResult.of("p1", "f3", "k6", 7));
+    // keys per bucket is very large
+    Map<HoodieFileGroupId, List<Integer>> partitioning3 = 
constructPartitioning(
+        Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0}),
+        Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2}),
+        Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1})
+    );
+    List<LookUpKeyAndResult> lookUpKeyAndResults3 = 
constructLookUpKeyAndResults(
+        LookUpKeyAndResult.of("p1", "f1", "k1", 0),
+        LookUpKeyAndResult.of("p1", "f1", "k2", 0),
+        LookUpKeyAndResult.of("p1", "f2", "k4", 2),
+        LookUpKeyAndResult.of("p1", "f3", "k6", 1));
+    // keys per bucket is in the middle
+    Map<HoodieFileGroupId, List<Integer>> partitioning4 = 
constructPartitioning(
+        Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 3}),
+        Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2, 0}),
+        Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1})
+    );
+    List<LookUpKeyAndResult> lookUpKeyAndResults4 = 
constructLookUpKeyAndResults(
+        LookUpKeyAndResult.of("p1", "f1", "k1", 0),
+        LookUpKeyAndResult.of("p1", "f1", "k2", 3),
+        LookUpKeyAndResult.of("p1", "f2", "k4", 0),
+        LookUpKeyAndResult.of("p1", "f3", "k6", 1));
+    Map<HoodieFileGroupId, List<Integer>> partitioning5 = 
constructPartitioning(
+        Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 3}),
+        Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2, 4}),
+        Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1})
+    );
+    List<LookUpKeyAndResult> lookUpKeyAndResults5 = 
constructLookUpKeyAndResults(
+        LookUpKeyAndResult.of("p1", "f1", "k1", 0),
+        LookUpKeyAndResult.of("p1", "f1", "k2", 3),
+        LookUpKeyAndResult.of("p1", "f2", "k4", 4),
+        LookUpKeyAndResult.of("p1", "f3", "k6", 1));
 
-  @Test
-  public void testAssignmentCorrectness() {
-    HoodieFileGroupId fg1 = new HoodieFileGroupId("p1", "f1");
-    HoodieFileGroupId fg2 = new HoodieFileGroupId("p1", "f2");
-    HoodieFileGroupId fg3 = new HoodieFileGroupId("p1", "f3");
+    return Arrays.stream(new Arguments[] {
+        // Configured parallelism should take effect
+        Arguments.of(4, 6, fileToComparisons, 10, false, 4, partitioning1, 
lookUpKeyAndResults1),
+        Arguments.of(4, 2, fileToComparisons, 10, false, 4, partitioning1, 
lookUpKeyAndResults1),
+        // Input parallelism should take effect
+        Arguments.of(0, 4, fileToComparisons, 10, false, 4, partitioning1, 
lookUpKeyAndResults1),
+        // Dynamic parallelism based on the keys per bucket should kick in
+        Arguments.of(0, 4, fileToComparisons, 10, true, 9, partitioning2, 
lookUpKeyAndResults2),
+        // Dynamic parallelism based on the keys per bucket that is large
+        Arguments.of(0, 4, fileToComparisons, 50, false, 3, partitioning3, 
lookUpKeyAndResults3),
+        Arguments.of(0, 4, fileToComparisons, 50, true, 3, partitioning3, 
lookUpKeyAndResults3),
+        // Dynamic parallelism based on the keys per bucket that is in the 
middle
+        Arguments.of(0, 4, fileToComparisons, 25, false, 4, partitioning4, 
lookUpKeyAndResults4),
+        Arguments.of(0, 4, fileToComparisons, 25, true, 5, partitioning5, 
lookUpKeyAndResults5)
+    });
+  }
 
-    Map<HoodieFileGroupId, Long> fileToComparisons = new 
HashMap<HoodieFileGroupId, Long>() {
-      {
-        put(fg1, 40L);
-        put(fg2, 35L);
-        put(fg3, 20L);
-      }
-    };
-    BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(4, 
fileToComparisons, 10);
-    Map<HoodieFileGroupId, List<Integer>> assignments = 
p.getFileGroupToPartitions();
-    assertEquals(4, assignments.get(fg1).size(), "f1 should have 4 buckets");
-    assertEquals(4, assignments.get(fg2).size(), "f2 should have 4 buckets");
-    assertEquals(2, assignments.get(fg3).size(), "f3 should have 2 buckets");
-    assertArrayEquals(new Integer[] {0, 0, 1, 3}, 
assignments.get(fg1).toArray(), "f1 spread across 3 partitions");
-    assertArrayEquals(new Integer[] {2, 2, 3, 1}, 
assignments.get(fg2).toArray(), "f2 spread across 3 partitions");
-    assertArrayEquals(new Integer[] {1, 0}, assignments.get(fg3).toArray(), 
"f3 spread across 2 partitions");
+  @ParameterizedTest
+  @MethodSource("partitioningTestCases")
+  void testPartitioning(int configuredParallelism,
+                        int inputParallelism,
+                        Map<HoodieFileGroupId, Long> fileToComparisons,
+                        int keysPerBucket,
+                        boolean shouldUseDynamicParallelism,
+                        int expectedNumPartitions,
+                        Map<HoodieFileGroupId, List<Integer>> 
expectedPartitioning,
+                        List<LookUpKeyAndResult> lookUpKeyAndResults) {
+    BucketizedBloomCheckPartitioner partitioner = new 
BucketizedBloomCheckPartitioner(
+        configuredParallelism, inputParallelism, fileToComparisons, 
keysPerBucket, shouldUseDynamicParallelism);
+    assertEquals(expectedNumPartitions, partitioner.numPartitions());
+    Map<HoodieFileGroupId, List<Integer>> actualPartitioning = 
partitioner.getFileGroupToPartitions();
+    assertEquals(expectedPartitioning.size(), actualPartitioning.size());
+    for (HoodieFileGroupId id : actualPartitioning.keySet()) {
+      assertTrue(expectedPartitioning.containsKey(id));
+      assertArrayEquals(expectedPartitioning.get(id).toArray(), 
expectedPartitioning.get(id).toArray());
+    }
+    lookUpKeyAndResults.forEach(lookUpKeyAndResult ->
+        assertEquals(lookUpKeyAndResult.expectedPartitionId, 
partitioner.getPartition(
+            Tuple2.apply(lookUpKeyAndResult.fileGroupId, 
lookUpKeyAndResult.recordKey))));
   }
 
   @Test
@@ -68,7 +152,8 @@ public class TestBucketizedBloomCheckPartitioner {
         IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", 
"f" + f), 100L));
       }
     };
-    BucketizedBloomCheckPartitioner partitioner = new 
BucketizedBloomCheckPartitioner(100, comparisons1, 10);
+    BucketizedBloomCheckPartitioner partitioner = new 
BucketizedBloomCheckPartitioner(
+        100, 100, comparisons1, 10, false);
     Map<HoodieFileGroupId, List<Integer>> assignments = 
partitioner.getFileGroupToPartitions();
     assignments.forEach((key, value) -> assertEquals(10, value.size()));
     Map<Integer, Long> partitionToNumBuckets =
@@ -84,7 +169,8 @@ public class TestBucketizedBloomCheckPartitioner {
         IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", 
"f" + f), 100L));
       }
     };
-    BucketizedBloomCheckPartitioner p = new 
BucketizedBloomCheckPartitioner(10000, comparisons1, 10);
+    BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(
+        10000, 10000, comparisons1, 10, false);
     assertEquals(100, p.numPartitions(), "num partitions must equal total 
buckets");
   }
 
@@ -95,7 +181,8 @@ public class TestBucketizedBloomCheckPartitioner {
         IntStream.range(0, 100000).forEach(f -> put(new 
HoodieFileGroupId("p1", "f" + f), 100L));
       }
     };
-    BucketizedBloomCheckPartitioner p = new 
BucketizedBloomCheckPartitioner(1000, comparisons1, 10);
+    BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(
+        1000, 1000, comparisons1, 10, false);
 
     IntStream.range(0, 100000).forEach(f -> {
       int partition = p.getPartition(Tuple2.apply(new HoodieFileGroupId("p1", 
"f" + f), "value"));
@@ -103,4 +190,41 @@ public class TestBucketizedBloomCheckPartitioner {
     });
   }
 
+  private static Map<HoodieFileGroupId, Long> 
constructFileToComparisons(Pair<HoodieFileGroupId, Long>... entries) {
+    Map<HoodieFileGroupId, Long> result = new HashMap<>();
+    Arrays.stream(entries).forEach(e -> result.put(e.getKey(), e.getValue()));
+    return result;
+  }
+
+  private static Map<HoodieFileGroupId, List<Integer>> 
constructPartitioning(Pair<HoodieFileGroupId, Integer[]>... entries) {
+    Map<HoodieFileGroupId, List<Integer>> result = new HashMap<>();
+    Arrays.stream(entries).forEach(e -> result.put(e.getKey(), 
Arrays.stream(e.getValue()).collect(Collectors.toList())));
+    return result;
+  }
+
+  private static List<LookUpKeyAndResult> 
constructLookUpKeyAndResults(LookUpKeyAndResult... entries) {
+    return Arrays.stream(entries).collect(Collectors.toList());
+  }
+
+  static class LookUpKeyAndResult {
+    HoodieFileGroupId fileGroupId;
+    String recordKey;
+    int expectedPartitionId;
+
+    private LookUpKeyAndResult(String partitionPath,
+                               String fileId,
+                               String recordKey,
+                               int expectedPartitionId) {
+      this.fileGroupId = new HoodieFileGroupId(partitionPath, fileId);
+      this.recordKey = recordKey;
+      this.expectedPartitionId = expectedPartitionId;
+    }
+
+    public static LookUpKeyAndResult of(String partitionPath,
+                                        String fileId,
+                                        String recordKey,
+                                        int expectedPartitionId) {
+      return new LookUpKeyAndResult(partitionPath, fileId, recordKey, 
expectedPartitionId);
+    }
+  }
 }

Reply via email to