danny0405 commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2299977999


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
+      if (combinedRecordData.map(record -> record != 
existingRecord.getRecord()).orElse(true)) {
         // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+        boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+        Comparable orderingValue = 
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+        T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema, 
props)
+            .map(hoodieAvroIndexedRecord -> 
recordContext.convertAvroRecord(hoodieAvroIndexedRecord.getData()))
+            .orElse(null);
+        return Option.of(BufferedRecords.fromEngineRecord(mergedEngineRecord, 
readerSchema, recordContext, orderingValue, existingRecord.getRecordKey(), 
isDelete));
+
       }
       return Option.empty();
     }
 
     @Override
-    public BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T> 
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
-      if (mergedRecordAndSchema.isEmpty()) {
-        return BufferedRecords.createDelete(newerRecord.getRecordKey());
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public BufferedRecord<T> mergeRecords(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
+
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return olderRecord;
       }
-      if (!mergedRecord.isDelete(mergeResultSchema, props)) {
-        IndexedRecord indexedRecord = (IndexedRecord) mergedRecord.getData();
-        return BufferedRecords.fromEngineRecord(
-            recordContext.convertAvroRecord(indexedRecord), mergeResultSchema, 
recordContext, orderingFieldNames, newerRecord.getRecordKey(), false);
-      }
-      return BufferedRecords.createDelete(newerRecord.getRecordKey());
+      boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+      Comparable orderingValue = 
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+      T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema, 
props)

Review Comment:
   The hoodie record -> toIndexedRecord -> `recordContext.convertAvroRecord` 
works but it would induce unnecessary avro conversion for engine hoodie record, 
maybe we introudce a new API on HoodieRecord named 
`HoodieRecord.toEngineRecord` or `HoodieRecord.getEngineRow` for payload based 
hoodie record?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to