nsivabalan commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2299386989


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -63,8 +63,15 @@ public interface HoodieRecordMerger extends Serializable {
    * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
    * of the single record, both orders of operations applications have to 
yield the same result)
    * This method takes only full records for merging.
+   *
+   * @param older     Older record in terms of commit time ordering.
+   * @param oldSchema The schema of the older record.
+   * @param newer     Newer record in terms of commit time ordering.
+   * @param newSchema The schema of the newer record.
+   * @param props     The additional properties for the merging operation.
+   * @return The merged record and schema. The record is expected to be 
non-null. If the record represents a deletion, the operation must be set as 
{@link HoodieOperation#DELETE}.

Review Comment:
   thanks for the java docs. on similar lines, can we enhance java docs for 
BufferedRecordMerger apis as well. esply when is Option.empty expected to be 
returned. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -236,19 +236,19 @@ public boolean isEmptyNewRecords() {
     return keyToNewRecords.isEmpty();
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema 
writerSchema) throws IOException {
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, HoodieRecord combineRecord, Schema writerSchema) 
throws IOException {
     boolean isDelete = false;
-    if (combineRecordOpt.isPresent()) {
-      if (oldRecord.getData() != combineRecordOpt.get().getData()) {
-        // the incoming record is chosen
-        isDelete = isDeleteRecord(newRecord);
-      } else {
-        // the incoming record is dropped
-        return false;
+    if (oldRecord.getData() != combineRecord.getData()) {
+      // the incoming record is chosen
+      isDelete = isDeleteRecord(combineRecord);
+      if (!isDelete) {
+        updatedRecordsWritten++;
       }
-      updatedRecordsWritten++;
+    } else {
+      // the incoming record is dropped

Review Comment:
   can you help me understand something in this context. 
   
   say we have rk1_v1 and we have new incoming rg1_v2. 
   and v2 has lower ordering value. 
   So, L 241 is false (bcoz, old matches the combined record)
   and don't we need to write the old record to the new file? how come we are 
returning right away in L 249? 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,9 +44,35 @@ public String getMergingStrategy() {
   }
 
   @Override
-  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
-    return combineAndGetUpdateValue(older, newer, oldSchema, newSchema, props)
-        .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+  public Pair<HoodieRecord, Schema> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {

Review Comment:
   can you confirm we added UTs for this change?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -161,8 +160,7 @@ protected static <T> HoodieRecord<T> 
reduceRecords(TypedProperties props, Buffer
       HoodieRecord<T> reducedRecord = merged.map(bufferedRecord -> 
recordContext.constructHoodieRecord(bufferedRecord, 
next.getPartitionPath())).orElse(previous);
       boolean choosePrevious = merged.isEmpty();
       HoodieKey reducedKey = choosePrevious ? previous.getKey() : 
next.getKey();
-      HoodieOperation operation = choosePrevious ? previous.getOperation() : 
next.getOperation();
-      return reducedRecord.newInstance(reducedKey, operation);
+      return reducedRecord.newInstance(reducedKey);

Review Comment:
   are we not interested in operation for deduping use-case and hence we are 
not carrying the operation over to the reducedRecord? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -236,19 +236,19 @@ public boolean isEmptyNewRecords() {
     return keyToNewRecords.isEmpty();
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema 
writerSchema) throws IOException {
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, HoodieRecord combineRecord, Schema writerSchema) 
throws IOException {
     boolean isDelete = false;
-    if (combineRecordOpt.isPresent()) {
-      if (oldRecord.getData() != combineRecordOpt.get().getData()) {
-        // the incoming record is chosen
-        isDelete = isDeleteRecord(newRecord);
-      } else {
-        // the incoming record is dropped
-        return false;
+    if (oldRecord.getData() != combineRecord.getData()) {
+      // the incoming record is chosen
+      isDelete = isDeleteRecord(combineRecord);
+      if (!isDelete) {
+        updatedRecordsWritten++;
       }
-      updatedRecordsWritten++;
+    } else {
+      // the incoming record is dropped

Review Comment:
   ok, I see the caller is handing the copying of old record. 
   Is there any different handling we do at the caller end? 
   why not handle writing both combined or old record here at L 251 only? 
   may be we have some valid reason, just trying to understand the reasoning 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -338,16 +334,15 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord 
deleteRecord, BufferedRecord
     public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
       // TODO(HUDI-7843): decouple the merging logic from the merger
       //  and use the record merge mode to control how to merge partial updates
-      Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().partialMerge(
+      Pair<HoodieRecord, Schema> mergedRecord = 
recordMerger.get().partialMerge(
           recordContext.constructHoodieRecord(olderRecord), 
recordContext.getSchemaFromBufferRecord(olderRecord),
           recordContext.constructHoodieRecord(newerRecord), 
recordContext.getSchemaFromBufferRecord(newerRecord),
           readerSchema, props);
 
-      if (mergedRecord.isPresent()
-          && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
-        HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
-        if (!mergedRecord.get().getRight().equals(readerSchema)) {
-          hoodieRecord = 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, 
readerSchema);
+      if (!mergedRecord.getLeft().isDelete(mergedRecord.getRight(), props)) {
+        HoodieRecord hoodieRecord = mergedRecord.getLeft();
+        if (!mergedRecord.getRight().equals(readerSchema)) {
+          hoodieRecord = 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.getRight(), null, 
readerSchema);

Review Comment:
   I remember there was a discussion around retaining ordering values for 
delete records as well. 
   I assume we are not fixing them in this patch right? 
   for eg, L 349 does not carry over the ordering value. so, just wanted to 
confirm my understanding. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));

Review Comment:
   minor: now that we have a separate merger for ExpressionPayload, do we need 
L 459 to 461? 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -136,6 +136,7 @@ default Option<Pair<HoodieRecord, Schema>> 
partialMerge(HoodieRecord older, Sche
    *
    * <p> This interface is experimental and might be evolved in the future.
    **/
+  @Deprecated

Review Comment:
   I vote to clean it up



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -338,16 +334,15 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord 
deleteRecord, BufferedRecord
     public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
       // TODO(HUDI-7843): decouple the merging logic from the merger
       //  and use the record merge mode to control how to merge partial updates
-      Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().partialMerge(
+      Pair<HoodieRecord, Schema> mergedRecord = 
recordMerger.get().partialMerge(

Review Comment:
   minor: Is it an intentional choice that we name "combinedRecord" above, and 
"mergedRecord" here. 
   why not use the same name everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to