nsivabalan commented on code in PR #13623:
URL: https://github.com/apache/hudi/pull/13623#discussion_r2231227284


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -63,44 +68,82 @@ BufferedRecord<T> partialMerge(BufferedRecord<T> newRecord,
                                  BufferedRecord<T> oldRecord,
                                  Schema newSchema,
                                  Schema oldSchema,
-                                 boolean keepOldMetadataColumns) {
+                                 Schema readerSchema,
+                                 boolean keepOldMetadataColumns,
+                                 TypedProperties props) {
     // Note that: When either newRecord or oldRecord is a delete record,
     //            skip partial update since delete records do not have 
meaningful columns.
-    if (partialUpdateMode == PartialUpdateMode.NONE
-        || null == oldRecord
+    if (null == oldRecord
         || newRecord.isDelete()
         || oldRecord.isDelete()) {
       return newRecord;
     }
 
     switch (partialUpdateMode) {
       case KEEP_VALUES:
-      case FILL_DEFAULTS:
-        return newRecord;
+        return reconcileBasedOnKeepValues(newRecord, oldRecord, newSchema, 
oldSchema, readerSchema, props);
       case IGNORE_DEFAULTS:
         return reconcileDefaultValues(
-            newRecord, oldRecord, newSchema, oldSchema, 
keepOldMetadataColumns);
-      case IGNORE_MARKERS:
+            newRecord, oldRecord, newSchema, oldSchema, 
keepOldMetadataColumns, false);
+      case IGNORE_DEFAULTS_NULLS:
+        return reconcileDefaultValues(newRecord, oldRecord, newSchema, 
oldSchema, keepOldMetadataColumns, true);
+      case FILL_UNAVAILABLE:
         return reconcileMarkerValues(
             newRecord, oldRecord, newSchema, oldSchema);
       default:
-        return newRecord;
+        throw new HoodieIOException("Unsupported PartialUpdateMode " + 
partialUpdateMode + " detected");
     }
   }
 
+  BufferedRecord<T> reconcileBasedOnKeepValues(BufferedRecord<T> newRecord,
+                                               BufferedRecord<T> oldRecord,
+                                               Schema newSchema,
+                                               Schema oldSchema,
+                                               Schema readerSchema,
+                                               TypedProperties props) {
+
+    // Merge and store the combined record
+    Option<Pair<BufferedRecord, Schema>> deleteHandlingResult = 
handleDeletes(oldRecord, oldSchema, newRecord, newSchema, props);
+    if (deleteHandlingResult != null) {
+      return newRecord; // if deleted, return newRecord. newRecord.isDelete() 
will return anyways.
+    }
+
+    return (BufferedRecord<T>) 
keepValuesPartialMergingUtils.mergePartialRecords(oldRecord, oldSchema, 
newRecord, newSchema, readerSchema, readerContext).getLeft();
+  }
+
+  /**
+   * Basic handling of deletes that is used by many of the spark mergers
+   * returns null if merger specific logic should be used
+   */
+  protected Option<Pair<BufferedRecord, Schema>> handleDeletes(BufferedRecord 
older, Schema oldSchema, BufferedRecord newer, Schema newSchema, 
TypedProperties props) {
+
+    if (newer.isDelete()) {
+      // Delete record
+      return Option.empty();
+    }
+
+    // old record
+    if (older.isDelete()) {
+      return Option.of(Pair.of(newer, newSchema));
+    }
+    return null;

Review Comment:
   actually we can remove this. at the beginning of 
PartialUpdateHandler.partialMerge(), we already account for all delete records
   ```
     /**
      * Merge records based on partial update mode.
      * Note that {@param newRecord} refers to the record with higher commit 
time if COMMIT_TIME_ORDERING mode is used,
      * or higher event time if EVENT_TIME_ORDERING mode us used.
      */
     BufferedRecord<T> partialMerge(BufferedRecord<T> newRecord,
                                    BufferedRecord<T> oldRecord,
                                    Schema newSchema,
                                    Schema oldSchema,
                                    Schema readerSchema,
                                    boolean keepOldMetadataColumns) {
       // Note that: When either newRecord or oldRecord is a delete record,
       //            skip partial update since delete records do not have 
meaningful columns.
       if (null == oldRecord
           || newRecord.isDelete()
           || oldRecord.isDelete()) {
         return newRecord;
       }
   .
   .
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to