danny0405 commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2300000482


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -567,42 +553,36 @@ public Option<BufferedRecord<T>> 
deltaMerge(BufferedRecord<T> newRecord, Buffere
       if (existingRecord == null) {
         return Option.of(newRecord);
       }
-      if (existingRecord.isDelete() || newRecord.isDelete()) {
-        if (shouldKeepNewerRecord(existingRecord, newRecord)) {
-          // IMPORTANT:
-          // this is needed when the fallback HoodieAvroRecordMerger got used, 
the merger would
-          // return Option.empty when the old payload data is empty(a delete) 
and ignores its ordering value directly.
-          return Option.of(newRecord);
-        } else {
-          return Option.empty();
-        }
-      }
-      return deltaMergeNonDeleteRecord(newRecord, existingRecord);
+      return deltaMergeRecords(newRecord, existingRecord);
     }
 
-    public abstract Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException;
+    public abstract Option<BufferedRecord<T>> 
deltaMergeRecords(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException;
 
     @Override
     public Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord, 
BufferedRecord<T> existingRecord) {
-      return deltaMergeDeleteRecord(deleteRecord, existingRecord);
+      BufferedRecord<T> deleteBufferedRecord = 
BufferedRecords.fromDeleteRecord(deleteRecord, recordContext);
+      try {
+        Option<BufferedRecord<T>> merged = deltaMerge(deleteBufferedRecord, 
existingRecord);
+        // If the delete record is chosen, return an option with the delete 
record, otherwise return empty.
+        return merged.isPresent() ? Option.of(deleteRecord) : Option.empty();
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to process delete record", e);
+      }
     }
 
     @Override
     public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
-      if (olderRecord.isDelete() || newerRecord.isDelete()) {
-        if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
-          // IMPORTANT:
-          // this is needed when the fallback HoodieAvroRecordMerger got used, 
the merger would
-          // return Option.empty when the new payload data is empty(a delete) 
and ignores its ordering value directly.
-          return newerRecord;
-        } else {
-          return olderRecord;
-        }
+      if (olderRecord == null) {
+        return newerRecord;
+      }
+      // handle special case for deletes that are sent to older partitions in 
global-index, this delete takes precedence regardless of the previous value
+      if (newerRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE) {

Review Comment:
   we better not have this special handing because for cdc inputs of Flink, 
each record could takea operation, the `#mergeRecords` should handle it 
correctly because the ordering value is commit time `0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to