nsivabalan commented on a change in pull request #1868:
URL: https://github.com/apache/hudi/pull/1868#discussion_r460534315



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
       String partitionPath = keyLocation._1().getPartitionPath();
       List<InsertBucket> targetBuckets = 
partitionPathToInsertBuckets.get(partitionPath);
       // pick the target bucket to use based on the weights.
-      double totalWeight = 0.0;
       final long totalInserts = Math.max(1, 
profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", 
keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / 
totalInserts;
-      for (InsertBucket insertBucket : targetBuckets) {
-        totalWeight += insertBucket.weight;
-        if (r <= totalWeight) {
-          return insertBucket.bucketNumber;
-        }
+
+      int index = binarySearch(targetBuckets, r);
+      if (index >= 0) {
+        return targetBuckets.get(index).bucketNumber;
+      }
+
+      if (-1 * index - 1 < targetBuckets.size()) {

Review comment:
       sorry why do we need this? if not found, why not return the first bucket 
? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
##########
@@ -18,24 +18,31 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.jetbrains.annotations.NotNull;
+
 import java.io.Serializable;
 
 /**
  * Helper class for an insert bucket along with the weight [0.0, 1.0] that 
defines the amount of incoming inserts that
  * should be allocated to the bucket.
  */
-public class InsertBucket implements Serializable {
+public class InsertBucket implements Serializable, Comparable<InsertBucket> {
 
   int bucketNumber;
-  // fraction of total inserts, that should go into this bucket
-  double weight;
+  // cumulate fraction of total inserts, that should go into this bucket and 
the previous bucket.
+  double cumulativeWeight;

Review comment:
       Somehow I don't feel comfortable adding cumulative weights to 
InsertBucket class. Each Insert bucket doesn't have anything to do with strict 
ordering wrt other buckets. UpsertPartitioner has some ordering for its own 
purpose. 
   
   Can we try something like this.
   partitionPathToInsertBucketInfo : Map<String, List/Array of Pair of <Insert 
Bucket number, cumulative weight> > 
   InsertBucketMap: Map of insert bucket number to InsertBucket object. 
   
   getPartition(key)
   will first look at partitionPathToInsertBucketInfo to fetch list of 
cumulative weights of all insert buckets along with insert bucket nos. Once we 
found the target bucket number, we can look into other map. 
   
   Or you can store InsertBucket itself in the value of 
partitionPathToInsertBucketInfo to avoid another look up. 
Tuple3<cumulativeWeight, InsetBucket>
   
   
   

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
       String partitionPath = keyLocation._1().getPartitionPath();
       List<InsertBucket> targetBuckets = 
partitionPathToInsertBuckets.get(partitionPath);
       // pick the target bucket to use based on the weights.
-      double totalWeight = 0.0;
       final long totalInserts = Math.max(1, 
profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", 
keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / 
totalInserts;
-      for (InsertBucket insertBucket : targetBuckets) {
-        totalWeight += insertBucket.weight;
-        if (r <= totalWeight) {
-          return insertBucket.bucketNumber;
-        }
+
+      int index = binarySearch(targetBuckets, r);

Review comment:
       why not Collections.binarySearch() ? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
       String partitionPath = keyLocation._1().getPartitionPath();
       List<InsertBucket> targetBuckets = 
partitionPathToInsertBuckets.get(partitionPath);
       // pick the target bucket to use based on the weights.
-      double totalWeight = 0.0;
       final long totalInserts = Math.max(1, 
profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", 
keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / 
totalInserts;
-      for (InsertBucket insertBucket : targetBuckets) {
-        totalWeight += insertBucket.weight;
-        if (r <= totalWeight) {
-          return insertBucket.bucketNumber;
-        }
+
+      int index = binarySearch(targetBuckets, r);
+      if (index >= 0) {
+        return targetBuckets.get(index).bucketNumber;
+      }
+
+      if (-1 * index - 1 < targetBuckets.size()) {

Review comment:
       also, ideally we should not hit this scenario only. If our bin packing 
is good, cumulative weight of last entry should be 1.0. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to