linliu-code commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2302038255


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -567,42 +553,36 @@ public Option<BufferedRecord<T>> 
deltaMerge(BufferedRecord<T> newRecord, Buffere
       if (existingRecord == null) {
         return Option.of(newRecord);
       }
-      if (existingRecord.isDelete() || newRecord.isDelete()) {
-        if (shouldKeepNewerRecord(existingRecord, newRecord)) {
-          // IMPORTANT:
-          // this is needed when the fallback HoodieAvroRecordMerger got used, 
the merger would
-          // return Option.empty when the old payload data is empty(a delete) 
and ignores its ordering value directly.
-          return Option.of(newRecord);
-        } else {
-          return Option.empty();
-        }
-      }
-      return deltaMergeNonDeleteRecord(newRecord, existingRecord);
+      return deltaMergeRecords(newRecord, existingRecord);
     }
 
-    public abstract Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException;
+    public abstract Option<BufferedRecord<T>> 
deltaMergeRecords(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException;
 
     @Override
     public Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord, 
BufferedRecord<T> existingRecord) {
-      return deltaMergeDeleteRecord(deleteRecord, existingRecord);
+      BufferedRecord<T> deleteBufferedRecord = 
BufferedRecords.fromDeleteRecord(deleteRecord, recordContext);
+      try {
+        Option<BufferedRecord<T>> merged = deltaMerge(deleteBufferedRecord, 
existingRecord);
+        // If the delete record is chosen, return an option with the delete 
record, otherwise return empty.
+        return merged.isPresent() ? Option.of(deleteRecord) : Option.empty();
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to process delete record", e);
+      }
     }
 
     @Override
     public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
-      if (olderRecord.isDelete() || newerRecord.isDelete()) {
-        if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
-          // IMPORTANT:
-          // this is needed when the fallback HoodieAvroRecordMerger got used, 
the merger would
-          // return Option.empty when the new payload data is empty(a delete) 
and ignores its ordering value directly.
-          return newerRecord;
-        } else {
-          return olderRecord;
-        }
+      if (olderRecord == null) {

Review Comment:
   Based on the current implements, both older and newer record arguments here 
can not be not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to