SteNicholas commented on a change in pull request #2111:
URL: https://github.com/apache/hudi/pull/2111#discussion_r501726744
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling()
throws Exception {
"Bucket 3 is INSERT");
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
- weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
- cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+ weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+ cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+ }
+
+ @Test
+ public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts .. Check updates go together & inserts subsplit, after
expanding smallest file
+ UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800
* 1024, testPartitionPath, false);
+ List<InsertBucketCumulativeWeightPair> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+ "Bucket 0 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+ "Bucket 1 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+ "Bucket 2 is INSERT");
Review comment:
@leesf Yes, I would like to use `foreach` assertion instead.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -743,47 +743,37 @@ public void testSmallInsertHandlingForUpserts() throws
Exception {
*/
@Test
public void testSmallInsertHandlingForInserts() throws Exception {
-
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); //
hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
- // Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1,
insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.insert(insertRecordsRDD1,
commitTime1).collect();
-
assertNoWriteErrors(statuses);
- assertPartitionMetadata(new String[] {testPartitionPath}, fs);
-
+ assertPartitionMetadata(new String[]{testPartitionPath}, fs);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
- String file1 = statuses.get(0).getFileId();
assertEquals(100,
readRowKeysFromParquet(hadoopConf, new Path(basePath,
statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
- // Second, set of Inserts should just expand file1
String commitTime2 = "002";
client.startCommitWithTime(commitTime2);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = recordsToRecordKeySet(inserts2);
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
-
- assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
- assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be
expanded");
- assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
Review comment:
@leesf These asserts makes no sense, because statuses.get(0).getFileId()
could be file1+"-0" and statuses.get(0).getStat().getPrevCommit() could be null.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -112,16 +115,17 @@ private void assignUpdates(WorkloadProfile profile) {
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries)
{
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
- addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+ addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey(),
profile.getOperationType());
}
}
}
- private int addUpdateBucket(String partitionPath, String fileIdHint) {
+ private int addUpdateBucket(String partitionPath, String fileIdHint,
WriteOperationType operationType) {
int bucket = totalBuckets;
updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
- bucketInfo.bucketType = BucketType.UPDATE;
+ bucketInfo.bucketType = operationType == null ||
isChangingRecords(operationType)
Review comment:
@leesf OK, I would move the logic outside of method `addUpdateBucket`.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -90,14 +88,33 @@ private UpsertPartitioner getUpsertPartitioner(int
smallFileSize, int numInserts
List<HoodieRecord> records = new ArrayList<>();
records.addAll(insertRecords);
records.addAll(updateRecords);
- WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(records)));
+ WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(records)),
WriteOperationType.UPSERT);
UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config);
assertEquals(0, partitioner.getPartition(
new Tuple2<>(updateRecords.get(0).getKey(),
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))),
"Update record should have gone to the 1 update partition");
return partitioner;
}
+ private UpsertPartitioner getInsertPartitioner(int smallFileSize, int
numInserts, int fileSize, String testPartitionPath,
+ boolean autoSplitInserts) throws Exception {
+ HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
+
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
Review comment:
@leesf I don't think `100` should be passed via method parameter. You
could refer to the method `getUpsertPartitioner`.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling()
throws Exception {
"Bucket 3 is INSERT");
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
- weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
- cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+ weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+ cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+ }
+
+ @Test
+ public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts .. Check updates go together & inserts subsplit, after
expanding smallest file
+ UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800
* 1024, testPartitionPath, false);
+ List<InsertBucketCumulativeWeightPair> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
Review comment:
@leesf I didn't get the point you mentioned. Do you mean that this
should add comments to describe how to get 3 expected partitions?
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
##########
@@ -52,7 +52,7 @@ public void testValidateSync() throws Exception {
executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + "
--cmdfile " + SYNC_VALIDATE_COMMANDS, true);
String expected = String.format("Count difference now is (count(%s) -
count(%s) == %d. Catch up count is %d",
- hiveTableName, hiveTableName2, 100, 200);
+ hiveTableName, hiveTableName2, 100, 100);
Review comment:
@leesf I don't think this should explain why change from 200 to 100,
because this test case could use the insert operation based on inserting new
records regardless of small file.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]