vinothchandar commented on a change in pull request #3401:
URL: https://github.com/apache/hudi/pull/3401#discussion_r683820637



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void 
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                              boolean isCompressionEnabled) 
throws Exception {
+    // Create records in a single partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+    // Build a common config with diff configs
+    Properties properties = new Properties();
+    properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
diskMapType.name());
+    
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 String.valueOf(isCompressionEnabled));
+    
metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP,
 "timestamp");
+
+    // Build a write config with insert parallelism set
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withProperties(properties)
+        .build();
+
+    // test1: combine before insert, small preCombineField value record will 
be merged and replaced.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = 
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {
+        if (row.getAs("_hoodie_record_key").equals("01")) {
+          assertEquals((Long) row.getAs("timestamp"), 4, "Record with greater 
preCombine field is chosen");
+        } else if (row.getAs("_hoodie_record_key").equals("02")) {
+          assertEquals((Long) row.getAs("timestamp"), 5, "Record with equal 
preCombine field, New arrival record is chosen");
+        } else if (row.getAs("_hoodie_record_key").equals("03")) {
+          assertEquals((Long) row.getAs("timestamp"), 6, "Record with greater 
preCombine field is chosen");
+        }
+      }
+      assertEquals(5, dataSet.count(),
+          "Must contain 5 records, because three records be combined");
+    }
+
+    //test2 not combine before insert, and combine will happen on 
HoodieMergeHandle.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "false");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+      //Combine before insert be set false, and combine will happen on 
HoodieMergeHandle.
+      String newCommitTime = "002";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = 
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {

Review comment:
       can we reuse the asserts from here?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void 
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                              boolean isCompressionEnabled) 
throws Exception {
+    // Create records in a single partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+    // Build a common config with diff configs
+    Properties properties = new Properties();
+    properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
diskMapType.name());
+    
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 String.valueOf(isCompressionEnabled));
+    
metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP,
 "timestamp");
+
+    // Build a write config with insert parallelism set
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withProperties(properties)
+        .build();
+
+    // test1: combine before insert, small preCombineField value record will 
be merged and replaced.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = 
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {
+        if (row.getAs("_hoodie_record_key").equals("01")) {

Review comment:
       use teh `HoodieRecord.` members?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -70,6 +70,10 @@
   private int type = 0;
   private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
 
+  public HoodieMetadataPayload(GenericRecord record, Comparable<?> 
orderingVal) {

Review comment:
       would all existing payloads be forced to implement this new constructor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to