danny0405 commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2299933233


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,9 +44,35 @@ public String getMergingStrategy() {
   }
 
   @Override
-  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
-    return combineAndGetUpdateValue(older, newer, oldSchema, newSchema, props)
-        .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+  public Pair<HoodieRecord, Schema> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+    Option<IndexedRecord> previousAvroData = older.toIndexedRecord(oldSchema, 
props).map(HoodieAvroIndexedRecord::getData);
+    HoodieRecordPayload payload = ((HoodieAvroRecord) newer).getData();
+    Option<IndexedRecord> updatedValue;
+    Comparable orderingVal;
+    if (previousAvroData.isEmpty()) {
+      updatedValue = payload.getInsertValue(newSchema, props);
+      orderingVal = payload.getOrderingValue();
+    } else {
+      updatedValue = ((HoodieAvroRecord) 
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), newSchema, 
props);
+      if (updatedValue.map(value -> value == 
previousAvroData.get()).orElse(false)) {
+        // Ordering value is set during record creation if one is required for 
this payload, so ordering fields are not required here
+        return Pair.of(older, oldSchema);
+      } else {
+        orderingVal = payload.getOrderingValue();
+      }
+    }
+    if (updatedValue.isEmpty()) {
+      // pass through the record that results in the delete operation. If both 
newer and older are deletes, we return the newer one.
+      if (newer.isDelete(newSchema, props)) {
+        return Pair.of(newer.newInstance(newer.getKey(), 
HoodieOperation.DELETE), newSchema);
+      } else if (older.isDelete(oldSchema, props)) {
+        // if older is delete, return it
+        return Pair.of(older.newInstance(newer.getKey(), 
HoodieOperation.DELETE), oldSchema);
+      }
+      // If neither of the records are a delete, return a delete record with 
an empty value and default ordering value.
+      return Pair.of(new HoodieEmptyRecord<>(newer.getKey(), 
HoodieOperation.DELETE, OrderingValues.getDefault(), HoodieRecordType.AVRO), 
newSchema);
+    }
+    return Pair.of(new HoodieAvroIndexedRecord(newer.getKey(), 
updatedValue.get(), orderingVal), updatedValue.get().getSchema());

Review Comment:
   not sure if we need to keep the hoodie operation too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to