nsivabalan commented on a change in pull request #1868:
URL: https://github.com/apache/hudi/pull/1868#discussion_r460534315
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
String partitionPath = keyLocation._1().getPartitionPath();
List<InsertBucket> targetBuckets =
partitionPathToInsertBuckets.get(partitionPath);
// pick the target bucket to use based on the weights.
- double totalWeight = 0.0;
final long totalInserts = Math.max(1,
profile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5",
keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) /
totalInserts;
- for (InsertBucket insertBucket : targetBuckets) {
- totalWeight += insertBucket.weight;
- if (r <= totalWeight) {
- return insertBucket.bucketNumber;
- }
+
+ int index = binarySearch(targetBuckets, r);
+ if (index >= 0) {
+ return targetBuckets.get(index).bucketNumber;
+ }
+
+ if (-1 * index - 1 < targetBuckets.size()) {
Review comment:
sorry why do we need this? if not found, why not return the first bucket
?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
##########
@@ -18,24 +18,31 @@
package org.apache.hudi.table.action.commit;
+import org.jetbrains.annotations.NotNull;
+
import java.io.Serializable;
/**
* Helper class for an insert bucket along with the weight [0.0, 1.0] that
defines the amount of incoming inserts that
* should be allocated to the bucket.
*/
-public class InsertBucket implements Serializable {
+public class InsertBucket implements Serializable, Comparable<InsertBucket> {
int bucketNumber;
- // fraction of total inserts, that should go into this bucket
- double weight;
+ // cumulate fraction of total inserts, that should go into this bucket and
the previous bucket.
+ double cumulativeWeight;
Review comment:
Somehow I don't feel comfortable adding cumulative weights to
InsertBucket class. Each Insert bucket doesn't have anything to do with strict
ordering wrt other buckets. UpsertPartitioner has some ordering for its own
purpose.
Can we try something like this.
partitionPathToInsertBucketInfo : Map<String, List/Array of Pair of <Insert
Bucket number, cumulative weight> >
InsertBucketMap: Map of insert bucket number to InsertBucket object.
getPartition(key)
will first look at partitionPathToInsertBucketInfo to fetch list of
cumulative weights of all insert buckets along with insert bucket nos. Once we
found the target bucket number, we can look into other map.
Or you can store InsertBucket itself in the value of
partitionPathToInsertBucketInfo to avoid another look up.
Tuple3<cumulativeWeight, InsetBucket>
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
String partitionPath = keyLocation._1().getPartitionPath();
List<InsertBucket> targetBuckets =
partitionPathToInsertBuckets.get(partitionPath);
// pick the target bucket to use based on the weights.
- double totalWeight = 0.0;
final long totalInserts = Math.max(1,
profile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5",
keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) /
totalInserts;
- for (InsertBucket insertBucket : targetBuckets) {
- totalWeight += insertBucket.weight;
- if (r <= totalWeight) {
- return insertBucket.bucketNumber;
- }
+
+ int index = binarySearch(targetBuckets, r);
Review comment:
why not Collections.binarySearch() ?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
String partitionPath = keyLocation._1().getPartitionPath();
List<InsertBucket> targetBuckets =
partitionPathToInsertBuckets.get(partitionPath);
// pick the target bucket to use based on the weights.
- double totalWeight = 0.0;
final long totalInserts = Math.max(1,
profile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5",
keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) /
totalInserts;
- for (InsertBucket insertBucket : targetBuckets) {
- totalWeight += insertBucket.weight;
- if (r <= totalWeight) {
- return insertBucket.bucketNumber;
- }
+
+ int index = binarySearch(targetBuckets, r);
+ if (index >= 0) {
+ return targetBuckets.get(index).bucketNumber;
+ }
+
+ if (-1 * index - 1 < targetBuckets.size()) {
Review comment:
also, ideally we should not hit this scenario only. If our bin packing
is good, cumulative weight of last entry should be 1.0.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]