[ 
https://issues.apache.org/jira/browse/HUDI-1082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157341#comment-17157341
 ] 

Hong Shen edited comment on HUDI-1082 at 7/15/20, 1:12 AM:
-----------------------------------------------------------

It seems does not matter. We just need to ensure that records are allocated to 
the filegroup in proportion to the weight.  The reason is that the hash value 
of the key is large, and it can be randomly distributed, so although global 
insert is used, it can eventually be distributed in proportion to weight.

Add a testcase testGetPartitioner2, it has two partitions "2016/09/26" and 
"2016/09/27".  Generate 200 insert records to partition "2016/09/26" and 2000 
insert records to  "2016/09/27", since the fileSize limit is 1000 * 1024, 
partition "2016/09/26" will generate 2 fileGroup, partition "2016/09/27" will 
generate 20 fileGroup. For the 200 insert records to partition "2016/09/26", it 
will has approximately 100 records to fileGroup1, and the others to fileGroup2.

 
{code:java|title=TestUpsertPartitioner.java|borderStyle=solid}
@Test

public void testGetPartitioner2() throws Exception {
 String testPartitionPath1 = "2016/09/26";
 String testPartitionPath2 = "2016/09/27";

 HoodieWriteConfig config = makeHoodieClientConfigBuilder()
 
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
 .insertSplitSize(100).autoTuneInsertSplits(false).build())
 .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 
1024).build()).build();

 HoodieClientTestUtils.fakeCommitFile(basePath, "001");

 metaClient = HoodieTableMetaClient.reload(metaClient);
 HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) 
HoodieTable.create(metaClient, config, hadoopConf);
 HoodieTestDataGenerator dataGenerator1 = new HoodieTestDataGenerator(new 
String[] \{testPartitionPath1});
 List<HoodieRecord> insertRecords1 = dataGenerator1.generateInserts("001", 200);
 List<HoodieRecord> records1 = new ArrayList<>();
 records1.addAll(insertRecords1);

 HoodieTestDataGenerator dataGenerator2 = new HoodieTestDataGenerator(new 
String[] \{testPartitionPath2});
 List<HoodieRecord> insertRecords2 = dataGenerator2.generateInserts("001", 
2000);
 records1.addAll(insertRecords2);

 WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records1));
 UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, 
config);
 Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
 for (HoodieRecord hoodieRecord: insertRecords1) {
 int partition = partitioner.getPartition(new Tuple2<>(
 hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
 if (!partition2numRecords.containsKey(partition)) {
 partition2numRecords.put(partition, 0);
 }
 int num = partition2numRecords.get(partition);
 partition2numRecords.put(partition, num + 1);
 }
 System.out.println(partition2numRecords);
}

{code}
{{Run 5 times, the outputs are:}}
{code:java}
{20=106, 21=94}

{20=97, 21=103}

{20=95, 21=105}

{20=107, 21=93}

{20=113, 21=87}
 {code}


was (Author: shenhong):
It seems does not matter. We just need to ensure that records are allocated to 
the filegroup in proportion to the weight.

Add a testcase testGetPartitioner2, it has two partitions "2016/09/26" and 
"2016/09/27".  Generate 200 insert records to partition "2016/09/26" and 2000 
insert records to  "2016/09/27", since the fileSize limit is 1000 * 1024, 
partition "2016/09/26" will generate 2 fileGroup, partition "2016/09/27" will 
generate 20 fileGroup. For the 200 insert records to partition "2016/09/26", it 
will has approximately 100 records to fileGroup1, and the others to fileGroup2.

 
{code:java|title=TestUpsertPartitioner.java|borderStyle=solid}
@Test

public void testGetPartitioner2() throws Exception {
 String testPartitionPath1 = "2016/09/26";
 String testPartitionPath2 = "2016/09/27";

 HoodieWriteConfig config = makeHoodieClientConfigBuilder()
 
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
 .insertSplitSize(100).autoTuneInsertSplits(false).build())
 .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 
1024).build()).build();

 HoodieClientTestUtils.fakeCommitFile(basePath, "001");

 metaClient = HoodieTableMetaClient.reload(metaClient);
 HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) 
HoodieTable.create(metaClient, config, hadoopConf);
 HoodieTestDataGenerator dataGenerator1 = new HoodieTestDataGenerator(new 
String[] \{testPartitionPath1});
 List<HoodieRecord> insertRecords1 = dataGenerator1.generateInserts("001", 200);
 List<HoodieRecord> records1 = new ArrayList<>();
 records1.addAll(insertRecords1);

 HoodieTestDataGenerator dataGenerator2 = new HoodieTestDataGenerator(new 
String[] \{testPartitionPath2});
 List<HoodieRecord> insertRecords2 = dataGenerator2.generateInserts("001", 
2000);
 records1.addAll(insertRecords2);

 WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records1));
 UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, 
config);
 Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
 for (HoodieRecord hoodieRecord: insertRecords1) {
 int partition = partitioner.getPartition(new Tuple2<>(
 hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
 if (!partition2numRecords.containsKey(partition)) {
 partition2numRecords.put(partition, 0);
 }
 int num = partition2numRecords.get(partition);
 partition2numRecords.put(partition, num + 1);
 }
 System.out.println(partition2numRecords);
}

{code}
{{Run 5 times, the outputs are:}}
{code:java}
{20=106, 21=94}

{20=97, 21=103}

{20=95, 21=105}

{20=107, 21=93}

{20=113, 21=87}
 {code}

> Bug in deciding the upsert/insert buckets
> -----------------------------------------
>
>                 Key: HUDI-1082
>                 URL: https://issues.apache.org/jira/browse/HUDI-1082
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Writer Core
>    Affects Versions: 0.6.0
>            Reporter: sivabalan narayanan
>            Assignee: Hong Shen
>            Priority: Major
>             Fix For: 0.6.1
>
>
> In 
> [UpsertPartitioner|[https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java]],
>  when getPartition(Object key) is called, the logic to determine where the 
> record to be inserted is relying on globalInsertCounts where as this should 
> be perPartitionInsertCount.
>  
> Bcoz, the weights for all targetInsert buckets are determined based on total 
> Inserts going into the partition of interest. // check like 200. Whereas when 
> getPartition(key) is called, we use global insert count to determine the 
> right bucket. 
>  
> For instance,
> P1: 3 insert buckets with weights 0.2, 0.5 and 0.3 with total records to be 
> inserted is 100.
> P2: 4 bucket with weights 0.1, 0.8, 0.05, 0.05 with total records to be 
> inserted is 10025. 
> So, ideal allocation is
> P1: B0 -> 20, B1 -> 50, B2 -> 30
> P2: B0 -> 1002, B1 -> 8020, B2 -> 502, B3 -> 503
>  
> getPartition() for a key is determined based on following.
> mod (hash value, inserts)/ inserts.
> Instead of considering inserts for the partition of interest, currently we 
> take global insert counts.
> Lets say, these are the hash values for insert records in P1.
> 5, 14, 20, 25, 90, 500, 1001, 5180.
> record hash | expected bucket in P1 | actual bucket in P1 |
> 5     | B0 | B0
> 14   | B0 | B0
> 21   | B1  | B0
> 30  | B1 | B0
> 90 | B2 | B0
> 500 | B0 | B0
> 1490 | B2 | B1
> 10019 | B0 | B3
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to