[ 
https://issues.apache.org/jira/browse/HUDI-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394439#comment-17394439
 ] 

ASF GitHub Bot commented on HUDI-2170:
--------------------------------------

swuferhong commented on a change in pull request #3401:
URL: https://github.com/apache/hudi/pull/3401#discussion_r683900152



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void 
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                              boolean isCompressionEnabled) 
throws Exception {
+    // Create records in a single partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+    // Build a common config with diff configs
+    Properties properties = new Properties();
+    properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
diskMapType.name());
+    
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 String.valueOf(isCompressionEnabled));
+    
metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP,
 "timestamp");
+
+    // Build a write config with insert parallelism set
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withProperties(properties)
+        .build();
+
+    // test1: combine before insert, small preCombineField value record will 
be merged and replaced.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = 
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {
+        if (row.getAs("_hoodie_record_key").equals("01")) {
+          assertEquals((Long) row.getAs("timestamp"), 4, "Record with greater 
preCombine field is chosen");
+        } else if (row.getAs("_hoodie_record_key").equals("02")) {
+          assertEquals((Long) row.getAs("timestamp"), 5, "Record with equal 
preCombine field, New arrival record is chosen");
+        } else if (row.getAs("_hoodie_record_key").equals("03")) {
+          assertEquals((Long) row.getAs("timestamp"), 6, "Record with greater 
preCombine field is chosen");
+        }
+      }
+      assertEquals(5, dataSet.count(),
+          "Must contain 5 records, because three records be combined");
+    }
+
+    //test2 not combine before insert, and combine will happen on 
HoodieMergeHandle.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "false");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+      //Combine before insert be set false, and combine will happen on 
HoodieMergeHandle.
+      String newCommitTime = "002";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = 
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {

Review comment:
       > can we reuse the asserts from here?
   
   Ok

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void 
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                              boolean isCompressionEnabled) 
throws Exception {
+    // Create records in a single partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+    // Build a common config with diff configs
+    Properties properties = new Properties();
+    properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
diskMapType.name());
+    
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 String.valueOf(isCompressionEnabled));
+    
metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP,
 "timestamp");
+
+    // Build a write config with insert parallelism set
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withProperties(properties)
+        .build();
+
+    // test1: combine before insert, small preCombineField value record will 
be merged and replaced.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = 
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {
+        if (row.getAs("_hoodie_record_key").equals("01")) {

Review comment:
       > use teh `HoodieRecord.` members?
   
   Ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Always choose the latest record for HoodieRecordPayload
> -------------------------------------------------------
>
>                 Key: HUDI-2170
>                 URL: https://issues.apache.org/jira/browse/HUDI-2170
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Common Core
>            Reporter: Danny Chen
>            Assignee: Danny Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Now in {{OverwriteWithLatestAvroPayload.preCombine}}, we still choose the old 
> record when the new record has the same preCombine field with the old one, 
> actually it is more natural to keep the new incoming record instead. The 
> {{DefaultHoodieRecordPayload.combineAndGetUpdateValue}} method already did 
> that.
> See issue: https://github.com/apache/hudi/issues/3266.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to