This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c3279cd [HUDI-1082] Fix minor bug in deciding the insert buckets
(#1838)
c3279cd is described below
commit c3279cd5989805946267b046007ea23ba4b615c2
Author: Shen Hong <[email protected]>
AuthorDate: Thu Jul 23 20:31:49 2020 +0800
[HUDI-1082] Fix minor bug in deciding the insert buckets (#1838)
---
.../table/action/commit/UpsertPartitioner.java | 11 +++--
.../table/action/commit/TestUpsertPartitioner.java | 57 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 8857fc3..755854b 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -67,9 +67,9 @@ public class UpsertPartitioner<T extends
HoodieRecordPayload<T>> extends Partiti
*/
private int totalBuckets = 0;
/**
- * Stat for the current workload. Helps in determining total inserts,
upserts etc.
+ * Stat for the current workload. Helps in determining inserts, upserts etc.
*/
- private WorkloadStat globalStat;
+ private WorkloadProfile profile;
/**
* Helps decide which bucket an incoming update should go to.
*/
@@ -92,7 +92,7 @@ public class UpsertPartitioner<T extends
HoodieRecordPayload<T>> extends Partiti
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>();
- globalStat = profile.getGlobalStat();
+ this.profile = profile;
this.table = table;
this.config = config;
assignUpdates(profile);
@@ -269,10 +269,11 @@ public class UpsertPartitioner<T extends
HoodieRecordPayload<T>> extends Partiti
HoodieRecordLocation location = keyLocation._2().get();
return updateLocationToBucket.get(location.getFileId());
} else {
- List<InsertBucket> targetBuckets =
partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
+ String partitionPath = keyLocation._1().getPartitionPath();
+ List<InsertBucket> targetBuckets =
partitionPathToInsertBuckets.get(partitionPath);
// pick the target bucket to use based on the weights.
double totalWeight = 0.0;
- final long totalInserts = Math.max(1, globalStat.getNumInserts());
+ final long totalInserts = Math.max(1,
profile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5",
keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) /
totalInserts;
for (InsertBucket insertBucket : targetBuckets) {
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 09be8f1..c526ad1 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -45,14 +45,17 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import scala.Tuple2;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
import static
org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -180,6 +183,60 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
}
@Test
+ public void testPartitionWeight() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ int totalInsertNum = 2000;
+
+ HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
+ .insertSplitSize(totalInsertNum /
2).autoTuneInsertSplits(false).build()).build();
+
+ HoodieClientTestUtils.fakeCommit(basePath, "001");
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable)
HoodieTable.create(metaClient, config, hadoopConf);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {testPartitionPath});
+ List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001",
totalInsertNum);
+
+ WorkloadProfile profile = new
WorkloadProfile(jsc.parallelize(insertRecords));
+ UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table,
config);
+ List<InsertBucket> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
+
+ float bucket0Weight = 0.2f;
+ InsertBucket newInsertBucket0 = new InsertBucket();
+ newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber;
+ newInsertBucket0.weight = bucket0Weight;
+ insertBuckets.remove(0);
+ insertBuckets.add(0, newInsertBucket0);
+
+ InsertBucket newInsertBucket1 = new InsertBucket();
+ newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber;
+ newInsertBucket1.weight = 1 - bucket0Weight;
+ insertBuckets.remove(1);
+ insertBuckets.add(1, newInsertBucket1);
+
+ Map<Integer, Integer> partition2numRecords = new HashMap<Integer,
Integer>();
+ for (HoodieRecord hoodieRecord: insertRecords) {
+ int partition = partitioner.getPartition(new Tuple2<>(
+ hoodieRecord.getKey(),
Option.ofNullable(hoodieRecord.getCurrentLocation())));
+ if (!partition2numRecords.containsKey(partition)) {
+ partition2numRecords.put(partition, 0);
+ }
+ partition2numRecords.put(partition, partition2numRecords.get(partition)
+ 1);
+ }
+
+ assertTrue(partition2numRecords.get(0) < partition2numRecords.get(1),
+ "The insert num of bucket1 should more than bucket0");
+ assertTrue(partition2numRecords.get(0) + partition2numRecords.get(1) ==
totalInsertNum,
+ "The total insert records should be " + totalInsertNum);
+ assertEquals(String.valueOf(bucket0Weight),
+ String.format("%.1f", (partition2numRecords.get(0) * 1.0f /
totalInsertNum)),
+ "The weight of bucket0 should be " + bucket0Weight);
+ assertEquals(String.valueOf(1 - bucket0Weight),
+ String.format("%.1f", (partition2numRecords.get(1) * 1.0f /
totalInsertNum)),
+ "The weight of bucket1 should be " + (1 - bucket0Weight));
+ }
+
+ @Test
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
final String testPartitionPath = "2016/09/26";
// Inserts + Updates .. Check updates go together & inserts subsplit,
after expanding