linliu-code commented on code in PR #13115:
URL: https://github.com/apache/hudi/pull/13115#discussion_r2035927580


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -275,43 +274,47 @@ protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
-            return Option.empty();
+            return Option.of(Pair.of(Option.ofNullable(record), metadata));
           case EVENT_TIME_ORDERING:
-            Comparable existingOrderingValue = readerContext.getOrderingValue(
-                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(),
-                readerSchema, orderingFieldName);
-            if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingValue)) {
-              return Option.empty();
-            }
-            Comparable incomingOrderingValue = readerContext.getOrderingValue(
-                Option.of(record), metadata, readerSchema, orderingFieldName);
-            if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
+            if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
               return Option.of(Pair.of(Option.of(record), metadata));
             }
             return Option.empty();
           case CUSTOM:
           default:
             // Merge and store the combined record
-            // Note that the incoming `record` is from an older commit, so it 
should be put as
-            // the `older` in the merge API
             if (payloadClass.isPresent()) {
+              if (existingRecordMetadataPair.getLeft().isEmpty()
+                  && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
+                // IMPORTANT:
+                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
+                // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
+                return Option.of(Pair.of(Option.of(record), metadata));
+              }
               Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
-                  getMergedRecord(Option.of(record), metadata, 
existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight());
+                  getMergedRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.of(record), metadata);
               if (combinedRecordAndSchemaOpt.isPresent()) {
                 T combinedRecordData = 
readerContext.convertAvroRecord((IndexedRecord) 
combinedRecordAndSchemaOpt.get().getLeft().getData());
                 // If pre-combine does not return existing record, update it
-                if (combinedRecordData != 
existingRecordMetadataPair.getLeft().get()) {
+                if (combinedRecordData != 
existingRecordMetadataPair.getLeft().orElse(null)) {
                   return 
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
                 }
               }
               return Option.empty();
             } else {
+              if (existingRecordMetadataPair.getLeft().isEmpty()
+                  && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
+                // IMPORTANT:
+                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
+                // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
+                return Option.of(Pair.of(Option.of(record), metadata));
+              }

Review Comment:
   DRP with L287-L293



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to