SteNicholas commented on a change in pull request #2111:
URL: https://github.com/apache/hudi/pull/2111#discussion_r501726744



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling() 
throws Exception {
         "Bucket 3 is INSERT");
     assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
 
-    weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
-    cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+    weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+    cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+  }
+
+  @Test
+  public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts  .. Check updates go together & inserts subsplit, after 
expanding smallest file
+    UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 
* 1024, testPartitionPath, false);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = 
partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+        "Bucket 0 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+        "Bucket 1 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+        "Bucket 2 is INSERT");

Review comment:
       @leesf Yes, I would like to use `foreach` assertion instead.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -743,47 +743,37 @@ public void testSmallInsertHandlingForUpserts() throws 
Exception {
    */
   @Test
   public void testSmallInsertHandlingForInserts() throws Exception {
-
     final String testPartitionPath = "2016/09/26";
     final int insertSplitLimit = 100;
     // setup the small file handling params
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // 
hold upto 200 records max
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config, false);
 
-    // Inserts => will write file1
     String commitTime1 = "001";
     client.startCommitWithTime(commitTime1);
     List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 
insertSplitLimit); // this writes ~500kb
     Set<String> keys1 = recordsToRecordKeySet(inserts1);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
     List<WriteStatus> statuses = client.insert(insertRecordsRDD1, 
commitTime1).collect();
-
     assertNoWriteErrors(statuses);
-    assertPartitionMetadata(new String[] {testPartitionPath}, fs);
-
+    assertPartitionMetadata(new String[]{testPartitionPath}, fs);
     assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
-    String file1 = statuses.get(0).getFileId();
     assertEquals(100,
         readRowKeysFromParquet(hadoopConf, new Path(basePath, 
statuses.get(0).getStat().getPath()))
             .size(), "file should contain 100 records");
 
-    // Second, set of Inserts should just expand file1
     String commitTime2 = "002";
     client.startCommitWithTime(commitTime2);
     List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
     Set<String> keys2 = recordsToRecordKeySet(inserts2);
     JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
     statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
     assertNoWriteErrors(statuses);
-
-    assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
-    assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be 
expanded");
-    assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), 
"Existing file should be expanded");

Review comment:
       @leesf These asserts makes no sense, because statuses.get(0).getFileId() 
could be file1+"-0" and statuses.get(0).getStat().getPrevCommit() could be null.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -112,16 +115,17 @@ private void assignUpdates(WorkloadProfile profile) {
     for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) 
{
       for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
           partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
-        addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+        addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey(), 
profile.getOperationType());
       }
     }
   }
 
-  private int addUpdateBucket(String partitionPath, String fileIdHint) {
+  private int addUpdateBucket(String partitionPath, String fileIdHint, 
WriteOperationType operationType) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
     BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
+    bucketInfo.bucketType = operationType == null || 
isChangingRecords(operationType)

Review comment:
       @leesf OK, I would move the logic outside of method `addUpdateBucket`.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -90,14 +88,33 @@ private UpsertPartitioner getUpsertPartitioner(int 
smallFileSize, int numInserts
     List<HoodieRecord> records = new ArrayList<>();
     records.addAll(insertRecords);
     records.addAll(updateRecords);
-    WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(records)));
+    WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(records)), 
WriteOperationType.UPSERT);
     UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config);
     assertEquals(0, partitioner.getPartition(
         new Tuple2<>(updateRecords.get(0).getKey(), 
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))),
         "Update record should have gone to the 1 update partition");
     return partitioner;
   }
 
+  private UpsertPartitioner getInsertPartitioner(int smallFileSize, int 
numInserts, int fileSize, String testPartitionPath,
+      boolean autoSplitInserts) throws Exception {
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+            
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
+                    
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())

Review comment:
       @leesf  I don't think `100` should be passed via method parameter. You 
could refer to the method `getUpsertPartitioner`.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling() 
throws Exception {
         "Bucket 3 is INSERT");
     assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
 
-    weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
-    cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+    weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+    cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+  }
+
+  @Test
+  public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts  .. Check updates go together & inserts subsplit, after 
expanding smallest file
+    UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 
* 1024, testPartitionPath, false);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = 
partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");

Review comment:
       @leesf I didn't get the point you mentioned. Do you mean that this 
should add comments to describe how to get 3 expected partitions?

##########
File path: 
hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
##########
@@ -52,7 +52,7 @@ public void testValidateSync() throws Exception {
         executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " 
--cmdfile " + SYNC_VALIDATE_COMMANDS, true);
 
     String expected = String.format("Count difference now is (count(%s) - 
count(%s) == %d. Catch up count is %d",
-        hiveTableName, hiveTableName2, 100, 200);
+        hiveTableName, hiveTableName2, 100, 100);

Review comment:
       @leesf I don't think this should explain why change from 200 to 100, 
because this test case could use the insert operation based on inserting new 
records regardless of small file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to