This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 34ff039da5a [HUDI-9399] Improve bloom filter bucketizing in Spark
(#13284)
34ff039da5a is described below
commit 34ff039da5a9605335016aec0ab8094093de7f46
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri May 9 21:47:17 2025 -0700
[HUDI-9399] Improve bloom filter bucketizing in Spark (#13284)
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 14 ++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../bloom/BucketizedBloomCheckPartitioner.java | 34 ++++-
.../index/bloom/SparkHoodieBloomIndexHelper.java | 6 +-
.../bloom/TestBucketizedBloomCheckPartitioner.java | 170 ++++++++++++++++++---
5 files changed, 196 insertions(+), 32 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 6458c02daaa..9aac9cf3534 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -148,6 +148,20 @@ public class HoodieIndexConfig extends HoodieConfig {
+ "When true, bucketized bloom filtering is enabled. "
+ "This reduces skew seen in sort based bloom index lookup");
+ public static final ConfigProperty<String>
BLOOM_INDEX_BUCKETIZED_CHECKING_ENABLE_DYNAMIC_PARALLELISM = ConfigProperty
+ .key("hoodie.bloom.index.bucketized.checking.enable.dynamic.parallelism")
+ .defaultValue("false")
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Only applies if index type is BLOOM and the
bucketized bloom filtering "
+ + "is enabled. When true, the index parallelism is determined by the
number of file "
+ + "groups to look up and the number of keys per bucket to split
comparisons within a "
+ + "file group; otherwise, the index parallelism is limited by the
input parallelism. "
+ + "PLEASE NOTE that if the bloom index parallelism (" +
BLOOM_INDEX_PARALLELISM.key()
+ + ") is configured, the bloom index parallelism takes effect instead
of the input "
+ + "parallelism and always limits the number of buckets calculated
based on the number "
+ + "of keys per bucket in the bucketized bloom filtering.");
+
public static final ConfigProperty<String>
BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING = ConfigProperty
.key("hoodie.bloom.index.fileid.key.sorting.enable")
.defaultValue("false")
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 812d083a70e..fc38c85adb6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2123,6 +2123,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}
+ public boolean useBloomIndexBucketizedCheckingWithDynamicParallelism() {
+ return
getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING_ENABLE_DYNAMIC_PARALLELISM);
+ }
+
public boolean isBloomIndexFileGroupIdKeySortingEnabled() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
index dad51f42d01..b367b2f8562 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
@@ -70,20 +70,40 @@ public class BucketizedBloomCheckPartitioner extends
Partitioner {
/**
* Create a partitioner that computes a plan based on provided workload
characteristics.
*
- * @param targetPartitions maximum number of partitions to target
- * @param fileGroupToComparisons number of expected comparisons per file
group
- * @param keysPerBucket maximum number of keys to pack in a single bucket
+ * @param configuredBloomIndexParallelism configured bloom index parallelism;
+ * 0 means not configured by the user
+ * @param inputParallelism input parallelism
+ * @param fileGroupToComparisons number of expected comparisons per
file group
+ * @param keysPerBucket maximum number of keys to pack in
a single bucket
+ * @param shouldUseDynamicParallelism whether the parallelism should be
determined
+ * by the keys per bucket
*/
- public BucketizedBloomCheckPartitioner(int targetPartitions,
Map<HoodieFileGroupId, Long> fileGroupToComparisons,
- int keysPerBucket) {
+ public BucketizedBloomCheckPartitioner(
+ int configuredBloomIndexParallelism,
+ int inputParallelism,
+ Map<HoodieFileGroupId, Long> fileGroupToComparisons,
+ int keysPerBucket,
+ boolean shouldUseDynamicParallelism) {
this.fileGroupToPartitions = new HashMap<>();
Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new HashMap<>();
// Compute the buckets needed per file group, using simple uniform
distribution
fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int)
Math.ceil((c * 1.0) / keysPerBucket)));
int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i ->
i).sum();
- // If totalBuckets > targetPartitions, no need to have extra partitions
- this.partitions = Math.min(targetPartitions, totalBuckets);
+
+ if (configuredBloomIndexParallelism > 0) {
+ // If bloom index parallelism is configured, the number of buckets is
+ // limited by the configured bloom index parallelism
+ this.partitions = Math.min(configuredBloomIndexParallelism,
totalBuckets);
+ } else if (shouldUseDynamicParallelism) {
+ // If bloom index parallelism is not configured, and dynamic buckets are
enabled,
+ // honor the number of buckets calculated based on the keys per bucket
+ this.partitions = totalBuckets;
+ } else {
+ // If bloom index parallelism is not configured, and dynamic buckets are
disabled,
+ // honor the input parallelism as the max number of buckets to use
+ this.partitions = Math.min(inputParallelism, totalBuckets);
+ }
// PHASE 1 : start filling upto minimum number of buckets into partitions,
taking all but one bucket from each file
// This tries to first optimize for goal 1 above, with knowledge that each
partition needs a certain minimum number
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 07133edcc49..8376c932f4d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -163,8 +163,10 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
} else if (config.useBloomIndexBucketizedChecking()) {
Map<HoodieFileGroupId, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(
config, recordsPerPartition, partitionToFileInfo,
fileComparisonsRDD, context);
- Partitioner partitioner = new
BucketizedBloomCheckPartitioner(targetParallelism, comparisonsPerFileGroup,
- config.getBloomIndexKeysPerBucket());
+ Partitioner partitioner = new BucketizedBloomCheckPartitioner(
+ configuredBloomIndexParallelism, inputParallelism,
comparisonsPerFileGroup,
+ config.getBloomIndexKeysPerBucket(),
+ config.useBloomIndexBucketizedCheckingWithDynamicParallelism());
keyLookupResultRDD = fileComparisonsRDD.mapToPair(fileGroupAndRecordKey
-> new Tuple2<>(fileGroupAndRecordKey, false))
.repartitionAndSortWithinPartitions(partitioner, new
FileGroupIdComparator())
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
index 69af259e603..e2929704514 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
@@ -22,12 +22,17 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.util.collection.Pair;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import scala.Tuple2;
@@ -36,28 +41,107 @@ import static
org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestBucketizedBloomCheckPartitioner {
+ private static Stream<Arguments> partitioningTestCases() {
+ // Case 1
+ Map<HoodieFileGroupId, Long> fileToComparisons =
constructFileToComparisons(
+ Pair.of(new HoodieFileGroupId("p1", "f1"), 30L),
+ Pair.of(new HoodieFileGroupId("p1", "f2"), 35L),
+ Pair.of(new HoodieFileGroupId("p1", "f3"), 20L)
+ );
+ // partitioning based on parallelism of 4
+ Map<HoodieFileGroupId, List<Integer>> partitioning1 =
constructPartitioning(
+ Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 0, 3}),
+ Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2, 2, 3, 1}),
+ Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1, 0})
+ );
+ List<LookUpKeyAndResult> lookUpKeyAndResults1 =
constructLookUpKeyAndResults(
+ LookUpKeyAndResult.of("p1", "f1", "k1", 3),
+ LookUpKeyAndResult.of("p1", "f1", "k2", 0),
+ LookUpKeyAndResult.of("p1", "f2", "k4", 2),
+ LookUpKeyAndResult.of("p1", "f3", "k7", 1));
+ // partitioning based on keys per bucket
+ Map<HoodieFileGroupId, List<Integer>> partitioning2 =
constructPartitioning(
+ Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 1, 6}),
+ Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {3, 4, 5, 8}),
+ Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {2, 7})
+ );
+ List<LookUpKeyAndResult> lookUpKeyAndResults2 =
constructLookUpKeyAndResults(
+ LookUpKeyAndResult.of("p1", "f1", "k1", 6),
+ LookUpKeyAndResult.of("p1", "f1", "k2", 0),
+ LookUpKeyAndResult.of("p1", "f2", "k4", 4),
+ LookUpKeyAndResult.of("p1", "f3", "k6", 7));
+ // keys per bucket is very large
+ Map<HoodieFileGroupId, List<Integer>> partitioning3 =
constructPartitioning(
+ Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0}),
+ Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2}),
+ Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1})
+ );
+ List<LookUpKeyAndResult> lookUpKeyAndResults3 =
constructLookUpKeyAndResults(
+ LookUpKeyAndResult.of("p1", "f1", "k1", 0),
+ LookUpKeyAndResult.of("p1", "f1", "k2", 0),
+ LookUpKeyAndResult.of("p1", "f2", "k4", 2),
+ LookUpKeyAndResult.of("p1", "f3", "k6", 1));
+ // keys per bucket is in the middle
+ Map<HoodieFileGroupId, List<Integer>> partitioning4 =
constructPartitioning(
+ Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 3}),
+ Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2, 0}),
+ Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1})
+ );
+ List<LookUpKeyAndResult> lookUpKeyAndResults4 =
constructLookUpKeyAndResults(
+ LookUpKeyAndResult.of("p1", "f1", "k1", 0),
+ LookUpKeyAndResult.of("p1", "f1", "k2", 3),
+ LookUpKeyAndResult.of("p1", "f2", "k4", 0),
+ LookUpKeyAndResult.of("p1", "f3", "k6", 1));
+ Map<HoodieFileGroupId, List<Integer>> partitioning5 =
constructPartitioning(
+ Pair.of(new HoodieFileGroupId("p1", "f1"), new Integer[] {0, 3}),
+ Pair.of(new HoodieFileGroupId("p1", "f2"), new Integer[] {2, 4}),
+ Pair.of(new HoodieFileGroupId("p1", "f3"), new Integer[] {1})
+ );
+ List<LookUpKeyAndResult> lookUpKeyAndResults5 =
constructLookUpKeyAndResults(
+ LookUpKeyAndResult.of("p1", "f1", "k1", 0),
+ LookUpKeyAndResult.of("p1", "f1", "k2", 3),
+ LookUpKeyAndResult.of("p1", "f2", "k4", 4),
+ LookUpKeyAndResult.of("p1", "f3", "k6", 1));
- @Test
- public void testAssignmentCorrectness() {
- HoodieFileGroupId fg1 = new HoodieFileGroupId("p1", "f1");
- HoodieFileGroupId fg2 = new HoodieFileGroupId("p1", "f2");
- HoodieFileGroupId fg3 = new HoodieFileGroupId("p1", "f3");
+ return Arrays.stream(new Arguments[] {
+ // Configured parallelism should take effect
+ Arguments.of(4, 6, fileToComparisons, 10, false, 4, partitioning1,
lookUpKeyAndResults1),
+ Arguments.of(4, 2, fileToComparisons, 10, false, 4, partitioning1,
lookUpKeyAndResults1),
+ // Input parallelism should take effect
+ Arguments.of(0, 4, fileToComparisons, 10, false, 4, partitioning1,
lookUpKeyAndResults1),
+ // Dynamic parallelism based on the keys per bucket should kick in
+ Arguments.of(0, 4, fileToComparisons, 10, true, 9, partitioning2,
lookUpKeyAndResults2),
+ // Dynamic parallelism based on the keys per bucket that is large
+ Arguments.of(0, 4, fileToComparisons, 50, false, 3, partitioning3,
lookUpKeyAndResults3),
+ Arguments.of(0, 4, fileToComparisons, 50, true, 3, partitioning3,
lookUpKeyAndResults3),
+ // Dynamic parallelism based on the keys per bucket that is in the
middle
+ Arguments.of(0, 4, fileToComparisons, 25, false, 4, partitioning4,
lookUpKeyAndResults4),
+ Arguments.of(0, 4, fileToComparisons, 25, true, 5, partitioning5,
lookUpKeyAndResults5)
+ });
+ }
- Map<HoodieFileGroupId, Long> fileToComparisons = new
HashMap<HoodieFileGroupId, Long>() {
- {
- put(fg1, 40L);
- put(fg2, 35L);
- put(fg3, 20L);
- }
- };
- BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(4,
fileToComparisons, 10);
- Map<HoodieFileGroupId, List<Integer>> assignments =
p.getFileGroupToPartitions();
- assertEquals(4, assignments.get(fg1).size(), "f1 should have 4 buckets");
- assertEquals(4, assignments.get(fg2).size(), "f2 should have 4 buckets");
- assertEquals(2, assignments.get(fg3).size(), "f3 should have 2 buckets");
- assertArrayEquals(new Integer[] {0, 0, 1, 3},
assignments.get(fg1).toArray(), "f1 spread across 3 partitions");
- assertArrayEquals(new Integer[] {2, 2, 3, 1},
assignments.get(fg2).toArray(), "f2 spread across 3 partitions");
- assertArrayEquals(new Integer[] {1, 0}, assignments.get(fg3).toArray(),
"f3 spread across 2 partitions");
+ @ParameterizedTest
+ @MethodSource("partitioningTestCases")
+ void testPartitioning(int configuredParallelism,
+ int inputParallelism,
+ Map<HoodieFileGroupId, Long> fileToComparisons,
+ int keysPerBucket,
+ boolean shouldUseDynamicParallelism,
+ int expectedNumPartitions,
+ Map<HoodieFileGroupId, List<Integer>>
expectedPartitioning,
+ List<LookUpKeyAndResult> lookUpKeyAndResults) {
+ BucketizedBloomCheckPartitioner partitioner = new
BucketizedBloomCheckPartitioner(
+ configuredParallelism, inputParallelism, fileToComparisons,
keysPerBucket, shouldUseDynamicParallelism);
+ assertEquals(expectedNumPartitions, partitioner.numPartitions());
+ Map<HoodieFileGroupId, List<Integer>> actualPartitioning =
partitioner.getFileGroupToPartitions();
+ assertEquals(expectedPartitioning.size(), actualPartitioning.size());
+ for (HoodieFileGroupId id : actualPartitioning.keySet()) {
+ assertTrue(expectedPartitioning.containsKey(id));
+ assertArrayEquals(expectedPartitioning.get(id).toArray(),
expectedPartitioning.get(id).toArray());
+ }
+ lookUpKeyAndResults.forEach(lookUpKeyAndResult ->
+ assertEquals(lookUpKeyAndResult.expectedPartitionId,
partitioner.getPartition(
+ Tuple2.apply(lookUpKeyAndResult.fileGroupId,
lookUpKeyAndResult.recordKey))));
}
@Test
@@ -68,7 +152,8 @@ public class TestBucketizedBloomCheckPartitioner {
IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1",
"f" + f), 100L));
}
};
- BucketizedBloomCheckPartitioner partitioner = new
BucketizedBloomCheckPartitioner(100, comparisons1, 10);
+ BucketizedBloomCheckPartitioner partitioner = new
BucketizedBloomCheckPartitioner(
+ 100, 100, comparisons1, 10, false);
Map<HoodieFileGroupId, List<Integer>> assignments =
partitioner.getFileGroupToPartitions();
assignments.forEach((key, value) -> assertEquals(10, value.size()));
Map<Integer, Long> partitionToNumBuckets =
@@ -84,7 +169,8 @@ public class TestBucketizedBloomCheckPartitioner {
IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1",
"f" + f), 100L));
}
};
- BucketizedBloomCheckPartitioner p = new
BucketizedBloomCheckPartitioner(10000, comparisons1, 10);
+ BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(
+ 10000, 10000, comparisons1, 10, false);
assertEquals(100, p.numPartitions(), "num partitions must equal total
buckets");
}
@@ -95,7 +181,8 @@ public class TestBucketizedBloomCheckPartitioner {
IntStream.range(0, 100000).forEach(f -> put(new
HoodieFileGroupId("p1", "f" + f), 100L));
}
};
- BucketizedBloomCheckPartitioner p = new
BucketizedBloomCheckPartitioner(1000, comparisons1, 10);
+ BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(
+ 1000, 1000, comparisons1, 10, false);
IntStream.range(0, 100000).forEach(f -> {
int partition = p.getPartition(Tuple2.apply(new HoodieFileGroupId("p1",
"f" + f), "value"));
@@ -103,4 +190,41 @@ public class TestBucketizedBloomCheckPartitioner {
});
}
+ private static Map<HoodieFileGroupId, Long>
constructFileToComparisons(Pair<HoodieFileGroupId, Long>... entries) {
+ Map<HoodieFileGroupId, Long> result = new HashMap<>();
+ Arrays.stream(entries).forEach(e -> result.put(e.getKey(), e.getValue()));
+ return result;
+ }
+
+ private static Map<HoodieFileGroupId, List<Integer>>
constructPartitioning(Pair<HoodieFileGroupId, Integer[]>... entries) {
+ Map<HoodieFileGroupId, List<Integer>> result = new HashMap<>();
+ Arrays.stream(entries).forEach(e -> result.put(e.getKey(),
Arrays.stream(e.getValue()).collect(Collectors.toList())));
+ return result;
+ }
+
+ private static List<LookUpKeyAndResult>
constructLookUpKeyAndResults(LookUpKeyAndResult... entries) {
+ return Arrays.stream(entries).collect(Collectors.toList());
+ }
+
+ static class LookUpKeyAndResult {
+ HoodieFileGroupId fileGroupId;
+ String recordKey;
+ int expectedPartitionId;
+
+ private LookUpKeyAndResult(String partitionPath,
+ String fileId,
+ String recordKey,
+ int expectedPartitionId) {
+ this.fileGroupId = new HoodieFileGroupId(partitionPath, fileId);
+ this.recordKey = recordKey;
+ this.expectedPartitionId = expectedPartitionId;
+ }
+
+ public static LookUpKeyAndResult of(String partitionPath,
+ String fileId,
+ String recordKey,
+ int expectedPartitionId) {
+ return new LookUpKeyAndResult(partitionPath, fileId, recordKey,
expectedPartitionId);
+ }
+ }
}