nsivabalan commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2301141732


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java:
##########
@@ -126,6 +125,25 @@ public Comparable convertValueToEngineType(Comparable 
value) {
     return value;
   }
 
+  public Comparable convertValueFromEngineType(Comparable value) {

Review Comment:
   can you confirm we have added UTs for these.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
+      if (combinedRecordData.map(record -> record != 
existingRecord.getRecord()).orElse(true)) {
         // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+        boolean isDelete = mergedRecord.isDelete(readerSchema, props);

Review Comment:
   readerSchema -> mergeResultSchema



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##########
@@ -39,33 +41,33 @@ public HoodieRecord.HoodieRecordType getRecordType() {
    * Basic handling of deletes that is used by many of the spark mergers
    * returns null if merger specific logic should be used
    */
-  protected Option<Pair<HoodieRecord, Schema>> handleDeletes(HoodieRecord 
older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties 
props) {
+  protected Pair<HoodieRecord, Schema> handleDeletes(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) {
     ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.SPARK);
     ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecord.HoodieRecordType.SPARK);
 
     if (newer instanceof HoodieSparkRecord) {
       HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
       if (newSparkRecord.isDelete(newSchema, props)) {
         // Delete record
-        return Option.empty();
+        return Pair.of(new HoodieEmptyRecord<>(newer.getKey(), 
HoodieOperation.DELETE, OrderingValues.getDefault(), 
HoodieRecord.HoodieRecordType.SPARK), newSchema);

Review Comment:
   can you track all these inconsistencies in some jira, so that we can revisit 
at some point. 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java:
##########
@@ -74,11 +74,7 @@ public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
-    if (recordBytes.length == 0) {
-      return Option.empty();
-    }
-
-    GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
+    Option<IndexedRecord> incomingRecord = recordBytes.length == 0 ? 
Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));

Review Comment:
   danny: diff is, previously, we return right away if bytes is empty. but now, 
we do not. we proceed after line 77. and thats where the fix applies that tim 
responded above. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -567,42 +553,36 @@ public Option<BufferedRecord<T>> 
deltaMerge(BufferedRecord<T> newRecord, Buffere
       if (existingRecord == null) {
         return Option.of(newRecord);
       }
-      if (existingRecord.isDelete() || newRecord.isDelete()) {
-        if (shouldKeepNewerRecord(existingRecord, newRecord)) {
-          // IMPORTANT:
-          // this is needed when the fallback HoodieAvroRecordMerger got used, 
the merger would
-          // return Option.empty when the old payload data is empty(a delete) 
and ignores its ordering value directly.
-          return Option.of(newRecord);
-        } else {
-          return Option.empty();
-        }
-      }
-      return deltaMergeNonDeleteRecord(newRecord, existingRecord);
+      return deltaMergeRecords(newRecord, existingRecord);
     }
 
-    public abstract Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException;
+    public abstract Option<BufferedRecord<T>> 
deltaMergeRecords(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException;
 
     @Override
     public Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord, 
BufferedRecord<T> existingRecord) {
-      return deltaMergeDeleteRecord(deleteRecord, existingRecord);
+      BufferedRecord<T> deleteBufferedRecord = 
BufferedRecords.fromDeleteRecord(deleteRecord, recordContext);
+      try {
+        Option<BufferedRecord<T>> merged = deltaMerge(deleteBufferedRecord, 
existingRecord);
+        // If the delete record is chosen, return an option with the delete 
record, otherwise return empty.
+        return merged.isPresent() ? Option.of(deleteRecord) : Option.empty();
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to process delete record", e);
+      }
     }
 
     @Override
     public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
-      if (olderRecord.isDelete() || newerRecord.isDelete()) {
-        if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
-          // IMPORTANT:
-          // this is needed when the fallback HoodieAvroRecordMerger got used, 
the merger would
-          // return Option.empty when the new payload data is empty(a delete) 
and ignores its ordering value directly.
-          return newerRecord;
-        } else {
-          return olderRecord;
-        }
+      if (olderRecord == null) {
+        return newerRecord;
+      }
+      // handle special case for deletes that are sent to older partitions in 
global-index, this delete takes precedence regardless of the previous value
+      if (newerRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE) {

Review Comment:
   in case of global index, won't the delete record will have commit time 
ordering value. So, merging w/ existing records for older partitions should 
work w/o this special handling is what I thought. 
   can you help explain why we need this special handling? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));

Review Comment:
   essentially whereever we are handling SENTINEL, can we remove from other 
mergers?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
+      if (combinedRecordData.map(record -> record != 
existingRecord.getRecord()).orElse(true)) {
         // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+        boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+        Comparable orderingValue = 
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+        T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema, 
props)
+            .map(hoodieAvroIndexedRecord -> 
recordContext.convertAvroRecord(hoodieAvroIndexedRecord.getData()))
+            .orElse(null);
+        return Option.of(BufferedRecords.fromEngineRecord(mergedEngineRecord, 
readerSchema, recordContext, orderingValue, existingRecord.getRecordKey(), 
isDelete));
+
       }
       return Option.empty();
     }
 
     @Override
-    public BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T> 
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
-      if (mergedRecordAndSchema.isEmpty()) {
-        return BufferedRecords.createDelete(newerRecord.getRecordKey());
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public BufferedRecord<T> mergeRecords(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
+
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return olderRecord;
       }
-      if (!mergedRecord.isDelete(mergeResultSchema, props)) {
-        IndexedRecord indexedRecord = (IndexedRecord) mergedRecord.getData();
-        return BufferedRecords.fromEngineRecord(
-            recordContext.convertAvroRecord(indexedRecord), mergeResultSchema, 
recordContext, orderingFieldNames, newerRecord.getRecordKey(), false);
-      }
-      return BufferedRecords.createDelete(newerRecord.getRecordKey());
+      boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+      Comparable orderingValue = 
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+      T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema, 
props)

Review Comment:
   I am with you Danny. we should def fix such unnecessary conversions. 
   one thing I am worried about is the time to get this patch merged and 
release 1.1. 
   in general, lets categorize functional and performance, and within 
performance, whether its regressing from existing behavior or not. 
   
   For perf enhancements which on par w/ existing code, I would recommend, we 
track these in a follow up jira and attend to them post 1.1. 
   We wanted to get everything done by squeezing every performance issues that 
we spot, it keeps delaying the release. 
   I am pretty sure if we introduce the changes you are proposing, we might 
have to fix non trivial amount of places.  



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java:
##########
@@ -39,33 +41,33 @@ public class DefaultHiveRecordMerger extends 
HoodieHiveRecordMerger {
   private String[] orderingFields;
 
   @Override
-  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+  public Pair<HoodieRecord, Schema> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
     ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.HIVE);
     ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecord.HoodieRecordType.HIVE);
     if (newer instanceof HoodieHiveRecord) {
       HoodieHiveRecord newHiveRecord = (HoodieHiveRecord) newer;
       if (newHiveRecord.isDelete(newSchema, props)) {
-        return Option.empty();
+        return Pair.of(new HoodieEmptyRecord<>(newer.getKey(), 
HoodieOperation.DELETE, OrderingValues.getDefault(), 
HoodieRecord.HoodieRecordType.HIVE), newSchema);

Review Comment:
   I also noticed this gap some where else. 
   if this is not breaking existing behavior, can we file a follow up jira and 
revisit entire deletes + ordering value gaps separately rather than trying to 
fix it in the same patch. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      Pair<HoodieRecord, Schema> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.getRight();
       // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
       if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      Option<T> combinedRecordData = 
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord -> 
recordContext.convertAvroRecord(indexedRecord.getData())));
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
+      if (combinedRecordData.map(record -> record != 
existingRecord.getRecord()).orElse(true)) {
         // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+        boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+        Comparable orderingValue = 
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+        T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema, 
props)

Review Comment:
   yes. combinedRecordData at L462 and mergedEngineRecord at L468 looks to be 
same. we should avoid it



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/CustomPayload.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+
+public class CustomPayload implements HoodieRecordPayload<CustomPayload> {
+  private final GenericRecord record;
+  private final Comparable orderingValue;
+
+  public CustomPayload(GenericRecord record, Comparable orderingValue) {
+    this.record = record;
+    this.orderingValue = orderingValue;
+  }
+
+  @Override
+  public CustomPayload preCombine(CustomPayload other) {
+    return this; // No-op for this test
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) {
+    Long olderTimestamp = (Long) ((GenericRecord) 
currentValue).get("timestamp");
+    Long newerTimestamp = orderingValue == null ? (Long) 
record.get("timestamp") : (Long) orderingValue;
+    if (olderTimestamp.equals(newerTimestamp)) {
+      // If the timestamps are the same, we do not update
+      return handleDeleteRecord(currentValue);
+    } else if (olderTimestamp < newerTimestamp) {
+      // Custom merger chooses record with lower ordering value

Review Comment:
   minor. "Custom merger" -> "Custom payload" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to