linliu-code commented on code in PR #12597:
URL: https://github.com/apache/hudi/pull/12597#discussion_r1934953770


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java:
##########
@@ -44,40 +47,46 @@ public String getMergingStrategy() {
   }
 
   @Override
-  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older,
+                                                  Schema oldSchema,
+                                                  HoodieRecord newer,
+                                                  Schema newSchema,
+                                                  TypedProperties props) 
throws IOException {
     ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
     ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
 
-    if (newer instanceof HoodieSparkRecord) {
-      HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
-      if (newSparkRecord.isDelete(newSchema, props)) {
-        // Delete record
-        return Option.empty();
-      }
-    } else {
-      if (newer.getData() == null) {
-        // Delete record
-        return Option.empty();
-      }
+    if (newer instanceof HoodieEmptyRecord) {
+      return Option.empty();
+    }
+    HoodieSparkRecord newRecord = (HoodieSparkRecord) newer;
+    if (older instanceof HoodieEmptyRecord) {
+      return newRecord.isDelete(newSchema, props)
+          ? Option.empty() : Option.of(Pair.of(newer, newSchema));
     }
+    HoodieSparkRecord oldRecord = (HoodieSparkRecord) older;
+    Comparable newOrderingVal = newRecord.getOrderingValue(newSchema, props);
+    Comparable oldOrderingVal = oldRecord.getOrderingValue(oldSchema, props);
 
-    if (older instanceof HoodieSparkRecord) {
-      HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
-      if (oldSparkRecord.isDelete(oldSchema, props)) {
-        // use natural order for delete record
-        return Option.of(Pair.of(newer, newSchema));
-      }
-    } else {
-      if (older.getData() == null) {
-        // use natural order for delete record
-        return Option.of(Pair.of(newer, newSchema));
-      }
+    // The same logic as fg reader.
+    // CASE 1: New record is a delete record with natural order.
+    if (newOrderingVal.equals(DEFAULT_ORDERING_VALUE)
+        && newRecord.isDelete(newSchema, props)) {
+      return Option.empty();
     }
-    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
-      return Option.of(Pair.of(older, oldSchema));
-    } else {
-      return Option.of(Pair.of(newer, newSchema));
+    // When old record has valid ordering value, and its value
+    // is higher than that of new record,
+    // Case 2: old record is a delete record, return empty.
+    // Case 3: old record is not a delete record, return old record.
+    if (!oldOrderingVal.equals(DEFAULT_ORDERING_VALUE)
+        && oldOrderingVal.compareTo(newOrderingVal) > 0) {
+      return oldRecord.isDelete(oldSchema, props)
+          ? Option.empty() : Option.of(Pair.of(older, oldSchema));
     }
+    // Otherwise, the return value is determined by new record.
+    // Case 4: new record is a delete record, return empty.
+    // Case 5: new record is not a delete record, return new.
+    return newRecord.isDelete(newSchema, props)
+        ? Option.empty() : Option.of(Pair.of(newer, newSchema));

Review Comment:
   This is the same logic as that of fg reader.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to