yihua commented on code in PR #13213:
URL: https://github.com/apache/hudi/pull/13213#discussion_r2057903288


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -337,34 +332,31 @@ && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), existingRecordMet
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
       //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
       //       it since these records will be put into records(Map).
-      return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+      return Option.of(newRecord);
     }
   }
 
   /**
    * Merge a delete record with another record (data, or delete).
    *
    * @param deleteRecord               The delete record
-   * @param existingRecordMetadataPair The existing record metadata pair
+   * @param existingRecord             The existing record metadata pair

Review Comment:
   nit: update Javadocs since it's changed to buffered record



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -498,32 +482,32 @@ protected Option<T> merge(Option<T> older, Map<String, 
Object> olderInfoMap,
               } else {
                 indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().getData();
               }
-              return 
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+              return Pair.of(false, 
readerContext.convertAvroRecord(indexedRecord));
             }
-            return Option.empty();
+            return null;

Review Comment:
   Similar here



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -498,32 +482,32 @@ protected Option<T> merge(Option<T> older, Map<String, 
Object> olderInfoMap,
               } else {
                 indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().getData();
               }
-              return 
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+              return Pair.of(false, 
readerContext.convertAvroRecord(indexedRecord));
             }
-            return Option.empty();
+            return null;
           } else {
-            if (older.isEmpty() || newer.isEmpty()) {
-              if (shouldKeepNewerRecord(older, olderInfoMap, newer, 
newerInfoMap)) {
+            if (olderRecord.isDelete() || newerRecord.isDelete()) {
+              if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
                 // IMPORTANT:
                 // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
                 // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
-                return newer;
+                return Pair.of(newerRecord.isDelete(), 
newerRecord.getRecord());
               } else {
-                return older;
+                return Pair.of(olderRecord.isDelete(), 
olderRecord.getRecord());
               }
             }
             Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
-                readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
-                readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap), props);
+                readerContext.constructHoodieRecord(olderRecord), 
readerContext.getSchemaFromBufferRecord(olderRecord),
+                readerContext.constructHoodieRecord(newerRecord), 
readerContext.getSchemaFromBufferRecord(newerRecord), props);
             if (mergedRecord.isPresent()
                 && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+              HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
               if (!mergedRecord.get().getRight().equals(readerSchema)) {
-                return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+                return Pair.of(false, (T) 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, 
readerSchema).getData());
               }
-              return Option.ofNullable((T) 
mergedRecord.get().getLeft().getData());
+              return Pair.of(false, (T) hoodieRecord.getData());
             }
-
-            return Option.empty();
+            return null;

Review Comment:
   Similar here on avoiding `null` if possible



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -498,32 +482,32 @@ protected Option<T> merge(Option<T> older, Map<String, 
Object> olderInfoMap,
               } else {
                 indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().getData();
               }
-              return 
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+              return Pair.of(false, 
readerContext.convertAvroRecord(indexedRecord));
             }
-            return Option.empty();
+            return null;
           } else {
-            if (older.isEmpty() || newer.isEmpty()) {
-              if (shouldKeepNewerRecord(older, olderInfoMap, newer, 
newerInfoMap)) {
+            if (olderRecord.isDelete() || newerRecord.isDelete()) {
+              if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
                 // IMPORTANT:
                 // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
                 // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
-                return newer;
+                return Pair.of(newerRecord.isDelete(), 
newerRecord.getRecord());
               } else {
-                return older;
+                return Pair.of(olderRecord.isDelete(), 
olderRecord.getRecord());
               }
             }
             Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
-                readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
-                readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap), props);
+                readerContext.constructHoodieRecord(olderRecord), 
readerContext.getSchemaFromBufferRecord(olderRecord),

Review Comment:
   One thing to validate later is that custom merger implementation can return 
`HoodieEmptyRecord` to indicate deletes.  That should be properly handled in 
the CUSTOM merge mode.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -428,68 +420,60 @@ protected Option<Pair<Function<T, T>, Schema>> 
composeEvolvedSchemaTransformer(
   /**
    * Merge two records using the configured record merger.
    *
-   * @param older
-   * @param olderInfoMap
-   * @param newer
-   * @param newerInfoMap
+   * @param olderRecord
+   * @param newerRecord
    * @return
    * @throws IOException
    */
-  protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
-                            Option<T> newer, Map<String, Object> newerInfoMap) 
throws IOException {
-    if (!older.isPresent()) {
-      return isDeleteRecord(newer, 
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
-    }
-
+  protected Pair<Boolean, T> merge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
     if (enablePartialMerging) {
       // TODO(HUDI-7843): decouple the merging logic from the merger
       //  and use the record merge mode to control how to merge partial updates
       Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().partialMerge(
-          readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
-          readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap),
+          readerContext.constructHoodieRecord(olderRecord), 
readerContext.getSchemaFromBufferRecord(olderRecord),
+          readerContext.constructHoodieRecord(newerRecord), 
readerContext.getSchemaFromBufferRecord(newerRecord),
           readerSchema, props);
 
       if (mergedRecord.isPresent()
           && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+        HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
         if (!mergedRecord.get().getRight().equals(readerSchema)) {
-          return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+          T data = (T) 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, 
readerSchema).getData();
+          return Pair.of(false, data);
         }
-        return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+        return Pair.of(false, (T) hoodieRecord.getData());
       }
-      return Option.empty();
+      return null;

Review Comment:
   Should this be `Pair.of(true, null)` so that return value is never `null`?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -428,68 +420,60 @@ protected Option<Pair<Function<T, T>, Schema>> 
composeEvolvedSchemaTransformer(
   /**
    * Merge two records using the configured record merger.
    *
-   * @param older
-   * @param olderInfoMap
-   * @param newer
-   * @param newerInfoMap
+   * @param olderRecord
+   * @param newerRecord
    * @return

Review Comment:
   Update Javadocs on return value



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -265,56 +257,59 @@ protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T
         HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
 
         // If pre-combine returns existing record, no need to update it
-        if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().orElse(null)) {
-          return Option.of(Pair.of(
-              Option.ofNullable(combinedRecord.getData()),
-              
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, 
combinedRecordAndSchema.getRight())));
+        if (combinedRecord.getData() != existingRecord.getRecord()) {
+          return Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), readerContext, props));
         }
         return Option.empty();
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
-            return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+            return Option.of(newRecord);
           case EVENT_TIME_ORDERING:
-            if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
-              return Option.of(Pair.of(Option.of(newRecord), metadata));
+            if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+              return Option.of(newRecord);
             }
             return Option.empty();
           case CUSTOM:
           default:
             // Merge and store the combined record
             if (payloadClass.isPresent()) {
-              if (existingRecordMetadataPair.getLeft().isEmpty()
-                  && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
-                // IMPORTANT:
-                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
-                // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
-                return Option.of(Pair.of(Option.of(newRecord), metadata));
+              if (existingRecord.isDelete() || newRecord.isDelete()) {
+                if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+                  // IMPORTANT:
+                  // this is needed when the fallback HoodieAvroRecordMerger 
got used, the merger would
+                  // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
+                  return Option.of(newRecord);
+                } else {
+                  return Option.empty();
+                }

Review Comment:
   This part can be extracted out since it is the same in both payload-based 
and merger-based logic (if-else branches).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to