danny0405 commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2299957029


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
+      if (combinedRecordData.map(record -> record != 
existingRecord.getRecord()).orElse(true)) {
         // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+        boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+        Comparable orderingValue = 
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+        T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema, 
props)

Review Comment:
   why convert to avro record then to engine record? `#convertAvroRecord` 
already returns engine record? `combinedRecordData` is already a T?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to