leesf commented on a change in pull request #2111:
URL: https://github.com/apache/hudi/pull/2111#discussion_r501659751



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -112,16 +115,17 @@ private void assignUpdates(WorkloadProfile profile) {
     for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) 
{
       for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
           partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
-        addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+        addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey(), 
profile.getOperationType());
       }
     }
   }
 
-  private int addUpdateBucket(String partitionPath, String fileIdHint) {
+  private int addUpdateBucket(String partitionPath, String fileIdHint, 
WriteOperationType operationType) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
     BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
+    bucketInfo.bucketType = operationType == null || 
isChangingRecords(operationType)

Review comment:
       The method called `addUpdateBucket`, but the BucketType would be 
`INSERT`, it is not very suitable, would we move the logic outside of the 
method?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -743,47 +743,37 @@ public void testSmallInsertHandlingForUpserts() throws 
Exception {
    */
   @Test
   public void testSmallInsertHandlingForInserts() throws Exception {
-
     final String testPartitionPath = "2016/09/26";
     final int insertSplitLimit = 100;
     // setup the small file handling params
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // 
hold upto 200 records max
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config, false);
 
-    // Inserts => will write file1
     String commitTime1 = "001";
     client.startCommitWithTime(commitTime1);
     List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 
insertSplitLimit); // this writes ~500kb
     Set<String> keys1 = recordsToRecordKeySet(inserts1);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
     List<WriteStatus> statuses = client.insert(insertRecordsRDD1, 
commitTime1).collect();
-
     assertNoWriteErrors(statuses);
-    assertPartitionMetadata(new String[] {testPartitionPath}, fs);
-
+    assertPartitionMetadata(new String[]{testPartitionPath}, fs);
     assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
-    String file1 = statuses.get(0).getFileId();
     assertEquals(100,
         readRowKeysFromParquet(hadoopConf, new Path(basePath, 
statuses.get(0).getStat().getPath()))
             .size(), "file should contain 100 records");
 
-    // Second, set of Inserts should just expand file1
     String commitTime2 = "002";
     client.startCommitWithTime(commitTime2);
     List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
     Set<String> keys2 = recordsToRecordKeySet(inserts2);
     JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
     statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
     assertNoWriteErrors(statuses);
-
-    assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
-    assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be 
expanded");
-    assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), 
"Existing file should be expanded");

Review comment:
       must remove these asserts?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -90,14 +88,33 @@ private UpsertPartitioner getUpsertPartitioner(int 
smallFileSize, int numInserts
     List<HoodieRecord> records = new ArrayList<>();
     records.addAll(insertRecords);
     records.addAll(updateRecords);
-    WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(records)));
+    WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(records)), 
WriteOperationType.UPSERT);
     UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config);
     assertEquals(0, partitioner.getPartition(
         new Tuple2<>(updateRecords.get(0).getKey(), 
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))),
         "Update record should have gone to the 1 update partition");
     return partitioner;
   }
 
+  private UpsertPartitioner getInsertPartitioner(int smallFileSize, int 
numInserts, int fileSize, String testPartitionPath,
+      boolean autoSplitInserts) throws Exception {
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+            
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
+                    
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())

Review comment:
       `100` should be passed via method parameter.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling() 
throws Exception {
         "Bucket 3 is INSERT");
     assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
 
-    weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
-    cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+    weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+    cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+  }
+
+  @Test
+  public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts  .. Check updates go together & inserts subsplit, after 
expanding smallest file
+    UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 
* 1024, testPartitionPath, false);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = 
partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");

Review comment:
       it would be better if you would describe how to get `3` expected 
partitions.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling() 
throws Exception {
         "Bucket 3 is INSERT");
     assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
 
-    weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
-    cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+    weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+    cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+  }
+
+  @Test
+  public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts  .. Check updates go together & inserts subsplit, after 
expanding smallest file
+    UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 
* 1024, testPartitionPath, false);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = 
partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+        "Bucket 0 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+        "Bucket 1 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+        "Bucket 2 is INSERT");
+    assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
+
+    Double[] weights = {0.5, 0.25, 0.25};
+    Double[] cumulativeWeights = {0.5, 0.75, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+
+    // Now with insert split size auto tuned
+    partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, 
testPartitionPath, true);
+    insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions");

Review comment:
       ditto

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling() 
throws Exception {
         "Bucket 3 is INSERT");
     assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
 
-    weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
-    cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+    weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+    cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+  }
+
+  @Test
+  public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts  .. Check updates go together & inserts subsplit, after 
expanding smallest file
+    UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 
* 1024, testPartitionPath, false);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = 
partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+        "Bucket 0 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+        "Bucket 1 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+        "Bucket 2 is INSERT");

Review comment:
       would you please use `foreach` assertion instead of listing all buckets?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
##########
@@ -286,8 +303,48 @@ public void testUpsertPartitionerWithSmallInsertHandling() 
throws Exception {
         "Bucket 3 is INSERT");
     assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
 
-    weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
-    cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+    weights = new Double[] {0.08, 0.31, 0.31, 0.31};
+    cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+  }
+
+  @Test
+  public void testInsertPartitionerWithSmallInsertHandling() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts  .. Check updates go together & inserts subsplit, after 
expanding smallest file
+    UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 
* 1024, testPartitionPath, false);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = 
partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+        "Bucket 0 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+        "Bucket 1 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+        "Bucket 2 is INSERT");
+    assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
+
+    Double[] weights = {0.5, 0.25, 0.25};
+    Double[] cumulativeWeights = {0.5, 0.75, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
+
+    // Now with insert split size auto tuned
+    partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, 
testPartitionPath, true);
+    insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType,
+        "Bucket 0 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType,
+        "Bucket 1 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
+        "Bucket 2 is INSERT");
+    assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType,
+        "Bucket 3 is INSERT");

Review comment:
       ditto

##########
File path: 
hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
##########
@@ -52,7 +52,7 @@ public void testValidateSync() throws Exception {
         executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " 
--cmdfile " + SYNC_VALIDATE_COMMANDS, true);
 
     String expected = String.format("Count difference now is (count(%s) - 
count(%s) == %d. Catch up count is %d",
-        hiveTableName, hiveTableName2, 100, 200);
+        hiveTableName, hiveTableName2, 100, 100);

Review comment:
       would you please explain why change from `200` to `100` ?

##########
File path: 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -211,7 +211,7 @@ class TestMORDataSource extends HoodieClientTestBase {
     val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
       .load(basePath + "/*/*/*/*")
-    assertEquals(200, hudiSnapshotDF5.count())
+    assertEquals(300, hudiSnapshotDF5.count())

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to