danny0405 commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2299957785


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
+      if (combinedRecordData.map(record -> record != 
existingRecord.getRecord()).orElse(true)) {
         // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+        boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+        Comparable orderingValue = 
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+        T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema, 
props)
+            .map(hoodieAvroIndexedRecord -> 
recordContext.convertAvroRecord(hoodieAvroIndexedRecord.getData()))
+            .orElse(null);
+        return Option.of(BufferedRecords.fromEngineRecord(mergedEngineRecord, 
readerSchema, recordContext, orderingValue, existingRecord.getRecordKey(), 
isDelete));

Review Comment:
   readerSchema or mergeResultSchema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to