yihua commented on code in PR #12938:
URL: https://github.com/apache/hudi/pull/12938#discussion_r1988101653


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java:
##########
@@ -75,15 +76,16 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
    * @param keysPerBucket maximum number of keys to pack in a single bucket
    */
   public BucketizedBloomCheckPartitioner(int targetPartitions, 
Map<HoodieFileGroupId, Long> fileGroupToComparisons,
-      int keysPerBucket) {
-    this.fileGroupToPartitions = new HashMap<>();
+      int keysPerBucket, boolean isBloomIndexParallelismConfigured) {
+    this.fileGroupToPartitions = new TreeMap<>();
 
-    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new HashMap<>();
+    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new TreeMap<>();

Review Comment:
   Does this has to be a tree map?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java:
##########
@@ -75,15 +76,16 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
    * @param keysPerBucket maximum number of keys to pack in a single bucket
    */
   public BucketizedBloomCheckPartitioner(int targetPartitions, 
Map<HoodieFileGroupId, Long> fileGroupToComparisons,
-      int keysPerBucket) {
-    this.fileGroupToPartitions = new HashMap<>();
+      int keysPerBucket, boolean isBloomIndexParallelismConfigured) {
+    this.fileGroupToPartitions = new TreeMap<>();
 
-    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new HashMap<>();
+    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new TreeMap<>();
     // Compute the buckets needed per file group, using simple uniform 
distribution
     fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int) 
Math.ceil((c * 1.0) / keysPerBucket)));
     int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> 
i).sum();
-    // If totalBuckets > targetPartitions, no need to have extra partitions
-    this.partitions = Math.min(targetPartitions, totalBuckets);
+    // If totalBuckets > targetPartitions, no need to have extra partitions if 
parallelism is explicitly configured.
+    // if not explicitly configured, we will honor max of (totalBuckets or 
dynamically derived partition count)
+    this.partitions = isBloomIndexParallelismConfigured ? 
Math.min(targetPartitions, totalBuckets) : Math.max(targetPartitions, 
totalBuckets);

Review Comment:
   Rename `targetPartitions` to `configuredParallelism`?  Both the docs and 
code are confusing, e.g., `parallelism is explicitly configured`, does this 
refer to `targetPartitions`?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java:
##########
@@ -75,15 +76,16 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
    * @param keysPerBucket maximum number of keys to pack in a single bucket
    */
   public BucketizedBloomCheckPartitioner(int targetPartitions, 
Map<HoodieFileGroupId, Long> fileGroupToComparisons,
-      int keysPerBucket) {
-    this.fileGroupToPartitions = new HashMap<>();
+      int keysPerBucket, boolean isBloomIndexParallelismConfigured) {
+    this.fileGroupToPartitions = new TreeMap<>();
 
-    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new HashMap<>();
+    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new TreeMap<>();
     // Compute the buckets needed per file group, using simple uniform 
distribution
     fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int) 
Math.ceil((c * 1.0) / keysPerBucket)));
     int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> 
i).sum();
-    // If totalBuckets > targetPartitions, no need to have extra partitions
-    this.partitions = Math.min(targetPartitions, totalBuckets);
+    // If totalBuckets > targetPartitions, no need to have extra partitions if 
parallelism is explicitly configured.
+    // if not explicitly configured, we will honor max of (totalBuckets or 
dynamically derived partition count)
+    this.partitions = isBloomIndexParallelismConfigured ? 
Math.min(targetPartitions, totalBuckets) : Math.max(targetPartitions, 
totalBuckets);

Review Comment:
   If not configured, should we use `totalBuckets` directly or have a better 
way of passing in the `targetPartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to