leesf commented on a change in pull request #2111:
URL: https://github.com/apache/hudi/pull/2111#discussion_r501659751
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -112,16 +115,17 @@ private void assignUpdates(WorkloadProfile profile) {
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries)
{
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
- addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+ addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey(),
profile.getOperationType());
}
}
}
- private int addUpdateBucket(String partitionPath, String fileIdHint) {
+ private int addUpdateBucket(String partitionPath, String fileIdHint,
WriteOperationType operationType) {
int bucket = totalBuckets;
updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
- bucketInfo.bucketType = BucketType.UPDATE;
+ bucketInfo.bucketType = operationType == null ||
isChangingRecords(operationType)
Review comment:
The method called `addUpdateBucket`, but the BucketType would be
`INSERT`, it is not very suitable, would we move the logic outside of the
method?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -743,47 +743,37 @@ public void testSmallInsertHandlingForUpserts() throws
Exception {
*/
@Test
public void testSmallInsertHandlingForInserts() throws Exception {
-
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); //
hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
- // Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1,
insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.insert(insertRecordsRDD1,
commitTime1).collect();
-
assertNoWriteErrors(statuses);
- assertPartitionMetadata(new String[] {testPartitionPath}, fs);
-
+ assertPartitionMetadata(new String[]{testPartitionPath}, fs);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
- String file1 = statuses.get(0).getFileId();
assertEquals(100,
readRowKeysFromParquet(hadoopConf, new Path(basePath,
statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
- // Second, set of Inserts should just expand file1
String commitTime2 = "002";
client.startCommitWithTime(commitTime2);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = recordsToRecordKeySet(inserts2);
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
-
- assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
- assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be
expanded");
- assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
Review comment:
must remove these asserts?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -90,14 +88,33 @@ private UpsertPartitioner getUpsertPartitioner(int
smallFileSize, int numInserts
List<HoodieRecord> records = new ArrayList<>();
records.addAll(insertRecords);
records.addAll(updateRecords);
- WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(records)));
+ WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(records)),
WriteOperationType.UPSERT);
UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config);
assertEquals(0, partitioner.getPartition(
new Tuple2<>(updateRecords.get(0).getKey(),
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))),
"Update record should have gone to the 1 update partition");
return partitioner;
}
+ private UpsertPartitioner getInsertPartitioner(int smallFileSize, int
numInserts, int fileSize, String testPartitionPath,
+ boolean autoSplitInserts) throws Exception {
+ HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
+
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
Review comment:
`100` should be passed via method parameter.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling()
throws Exception {
"Bucket 3 is INSERT");
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
- weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
- cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+ weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+ cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+ }
+
+ @Test
+ public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts .. Check updates go together & inserts subsplit, after
expanding smallest file
+ UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800
* 1024, testPartitionPath, false);
+ List<InsertBucketCumulativeWeightPair> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
Review comment:
it would be better if you would describe how to get `3` expected
partitions.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling()
throws Exception {
"Bucket 3 is INSERT");
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
- weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
- cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+ weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+ cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+ }
+
+ @Test
+ public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts .. Check updates go together & inserts subsplit, after
expanding smallest file
+ UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800
* 1024, testPartitionPath, false);
+ List<InsertBucketCumulativeWeightPair> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+ "Bucket 0 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+ "Bucket 1 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+ "Bucket 2 is INSERT");
+ assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
+
+ Double[] weights = {0.5, 0.25, 0.25};
+ Double[] cumulativeWeights = {0.5, 0.75, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+
+ // Now with insert split size auto tuned
+ partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024,
testPartitionPath, true);
+ insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions");
Review comment:
ditto
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling()
throws Exception {
"Bucket 3 is INSERT");
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
- weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
- cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+ weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+ cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+ }
+
+ @Test
+ public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts .. Check updates go together & inserts subsplit, after
expanding smallest file
+ UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800
* 1024, testPartitionPath, false);
+ List<InsertBucketCumulativeWeightPair> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+ "Bucket 0 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+ "Bucket 1 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+ "Bucket 2 is INSERT");
Review comment:
would you please use `foreach` assertion instead of listing all buckets?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling()
throws Exception {
"Bucket 3 is INSERT");
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
- weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
- cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+ weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+ cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+ }
+
+ @Test
+ public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ // Inserts .. Check updates go together & inserts subsplit, after
expanding smallest file
+ UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800
* 1024, testPartitionPath, false);
+ List<InsertBucketCumulativeWeightPair> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+ "Bucket 0 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+ "Bucket 1 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+ "Bucket 2 is INSERT");
+ assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
+
+ Double[] weights = {0.5, 0.25, 0.25};
+ Double[] cumulativeWeights = {0.5, 0.75, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+
+ // Now with insert split size auto tuned
+ partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024,
testPartitionPath, true);
+ insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+ assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+ "Bucket 0 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+ "Bucket 1 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+ "Bucket 2 is INSERT");
+ assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType,
+ "Bucket 3 is INSERT");
Review comment:
ditto
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
##########
@@ -52,7 +52,7 @@ public void testValidateSync() throws Exception {
executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + "
--cmdfile " + SYNC_VALIDATE_COMMANDS, true);
String expected = String.format("Count difference now is (count(%s) -
count(%s) == %d. Catch up count is %d",
- hiveTableName, hiveTableName2, 100, 200);
+ hiveTableName, hiveTableName2, 100, 100);
Review comment:
would you please explain why change from `200` to `100` ?
##########
File path:
hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -211,7 +211,7 @@ class TestMORDataSource extends HoodieClientTestBase {
val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
- assertEquals(200, hudiSnapshotDF5.count())
+ assertEquals(300, hudiSnapshotDF5.count())
Review comment:
ditto
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]