nsivabalan commented on code in PR #17604:
URL: https://github.com/apache/hudi/pull/17604#discussion_r2729557354


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class to assist with merging two versions of the record that may contain 
partial updates using
+ * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode.
+ */

Review Comment:
   we should call out the expectations from this class. w/ a simple example as 
well. 
   Also, lets call out the expectations from nested fields



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -101,6 +101,7 @@ protected FileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
     this.readerSchema = 
HoodieSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
     this.recordMergeMode = recordMergeMode;
     this.partialUpdateModeOpt = partialUpdateModeOpt;
+    this.enablePartialMerging = partialUpdateModeOpt.isPresent();

Review Comment:
   lets be cautious in expanding the terminology now. 
   previously,`enablePartialMerging` was mainly referring to partial encoding 
from MOR.
   but now w/ this patch, it is also referring to partial update mode (table 
property). 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java:
##########
@@ -82,17 +85,43 @@ BufferedRecord<T> partialMerge(BufferedRecord<T> 
highOrderRecord,
     }
 
     switch (partialUpdateMode) {
+      case KEEP_VALUES:
+        return reconcileBasedOnKeepValues(highOrderRecord, lowOrderRecord, 
highOrderSchema, lowOrderSchema, newSchema);
       case IGNORE_DEFAULTS:
         return reconcileDefaultValues(
             highOrderRecord, lowOrderRecord, highOrderSchema, lowOrderSchema, 
newSchema);
       case FILL_UNAVAILABLE:
         return reconcileMarkerValues(
             highOrderRecord, lowOrderRecord, highOrderSchema, lowOrderSchema, 
newSchema);
       default:
-        return highOrderRecord;
+        throw new HoodieIOException("Unsupported PartialUpdateMode " + 
partialUpdateMode + " detected");
     }
   }
 
+  /**
+   * Reconcile two versions of the record based on KEEP_VALUES.
+   * i.e for values missing from new record, we pick from older record, if 
not, value from new record is picked for each column.
+   * @param highOrderRecord record with higher commit time or higher ordering 
value
+   * @param lowOrderRecord  record with lower commit time or lower ordering 
value
+   * @param highOrderSchema The schema of highOrderRecord
+   * @param lowOrderSchema  The schema of the older record
+   * @param newSchema       The schema of the new incoming record
+   * @return the merged record of type {@link BufferedRecord}
+   */
+  BufferedRecord<T> reconcileBasedOnKeepValues(BufferedRecord<T> 
highOrderRecord,

Review Comment:
   private



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class to assist with merging two versions of the record that may contain 
partial updates using
+ * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode.
+ */
+public class PartialMergerWithKeepValues<T> implements Serializable {
+  private final Map<HoodieSchema, Set<String>>
+      fieldNameCache = new HashMap<>();
+  private final Map<Pair<Pair<HoodieSchema, HoodieSchema>, HoodieSchema>, 
HoodieSchema>
+      mergedSchemaCache = new HashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param lowOrderRecord  record with lower commit time or lower ordering 
value
+   * @param lowOrderSchema  The schema of the older record
+   * @param highOrderRecord record with higher commit time or higher ordering 
value
+   * @param highOrderSchema The schema of highOrderRecord
+   * @param newSchema       The schema of the new incoming record
+   * @return The merged record and schema.
+   */
+  Pair<BufferedRecord<T>, HoodieSchema> mergePartialRecords(BufferedRecord<T> 
lowOrderRecord,
+                                                             HoodieSchema 
lowOrderSchema,
+                                                             BufferedRecord<T> 
highOrderRecord,
+                                                             HoodieSchema 
highOrderSchema,
+                                                             HoodieSchema 
newSchema,
+                                                             RecordContext<T> 
recordContext) {
+    // The merged schema contains fields that only appear in either older 
and/or newer record.
+    HoodieSchema mergedSchema =
+        getCachedMergedSchema(lowOrderSchema, highOrderSchema, newSchema);
+    boolean isNewerPartial = isPartial(highOrderSchema, mergedSchema);
+    if (!isNewerPartial) {
+      return Pair.of(highOrderRecord, highOrderSchema);
+    }
+    Set<String> fieldNamesInNewRecord =
+        getCachedFieldNames(highOrderSchema);
+    // Collect field values.
+    List<HoodieSchemaField> fields = mergedSchema.getFields();
+    Object[] fieldVals = new Object[fields.size()];
+    int idx = 0;
+    List<HoodieSchemaField> mergedSchemaFields = mergedSchema.getFields();
+    for (HoodieSchemaField mergedSchemaField : mergedSchemaFields) {
+      String fieldName = mergedSchemaField.name();
+      if (fieldNamesInNewRecord.contains(fieldName)) { // field present in 
newer record.
+        fieldVals[idx++] = recordContext.getValue(highOrderRecord.getRecord(), 
highOrderSchema, fieldName);
+      } else { // if not present in newer record pick from old record
+        fieldVals[idx++] = recordContext.getValue(lowOrderRecord.getRecord(), 
lowOrderSchema, fieldName);
+      }
+    }
+    // Build merged record.
+    T engineRecord = recordContext.constructEngineRecord(mergedSchema, 
fieldVals);
+    BufferedRecord<T> mergedRecord = new BufferedRecord<>(
+        highOrderRecord.getRecordKey(),
+        highOrderRecord.getOrderingValue(),
+        engineRecord,
+        recordContext.encodeSchema(mergedSchema),
+        highOrderRecord.getHoodieOperation());
+    return Pair.of(mergedRecord, mergedSchema);
+  }
+
+  /**
+   * @param hoodieSchema Hoodie schema.
+   * @return The set of field names.
+   */
+  Set<String> getCachedFieldNames(HoodieSchema hoodieSchema) {
+    return fieldNameCache.computeIfAbsent(hoodieSchema, schema -> {
+      Set<String> fieldNames = new HashSet<>();
+      for (HoodieSchemaField field : schema.getFields()) {
+        fieldNames.add(field.name());
+      }
+      return fieldNames;
+    });
+  }
+
+  /**
+   * Merges the two schemas so the merged schema contains all the fields from 
the two schemas,
+   * with the same ordering of fields based on the provided reader schema.
+   *
+   * @param oldSchema    Old schema.
+   * @param newSchema    New schema.
+   * @param readerSchema Reader schema containing all the fields to read.
+   * @return             The merged Avro schema.
+   */
+  HoodieSchema getCachedMergedSchema(HoodieSchema oldSchema,

Review Comment:
   private



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java:
##########
@@ -115,6 +115,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, 
Option<KeySpec> keySpecO
       // When a data block contains partial updates, subsequent record merging 
must always use
       // partial merging.
       enablePartialMerging = true;
+      if (partialUpdateModeOpt.isEmpty()) {
+        this.partialUpdateModeOpt = Option.of(PartialUpdateMode.KEEP_VALUES);

Review Comment:
   can you confirm this behavior is retained. 
   
   File group:
   base file, and 2 log files has full schema. 
   and 3rd log file contains partially encoded schema (MOR partial encoding). 
   
   when we are merging different log files, we should enable 
`enablePartialMerging` and `partialUpdateModeOpt` only when 3rd log file is 
processed and not before that. 
   
   and hence these lines 
   ```
       this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
           readerContext, recordMergeMode, enablePartialMerging, recordMerger, 
readerSchema, payloadClasses, props, partialUpdateModeOpt);
   ```
   
    in FileGroupRecordBuffer should reflect that accordingly 
   
   



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java:
##########
@@ -97,16 +104,41 @@ public HoodieRecord<ArrayWritable> 
constructHoodieRecord(BufferedRecord<ArrayWri
 
   @Override
   public ArrayWritable constructEngineRecord(HoodieSchema recordSchema, 
Object[] fieldValues) {
-    return new ArrayWritable(Writable.class, (Writable[]) fieldValues);
+    Schema avroSchema = recordSchema.toAvroSchema();
+    List<Schema.Field> fields = avroSchema.getFields();
+    if (fields.size() != fieldValues.length) {
+      throw new IllegalArgumentException("Mismatch between schema fields and 
values");
+    }
+
+    Writable[] writables = new Writable[fields.size()];
+    for (int i = 0; i < fields.size(); i++) {
+      Schema fieldSchema = resolveUnion(fields.get(i).schema());
+      writables[i] = convertToWritable(fieldSchema, fieldValues[i]);
+    }
+    return new ArrayWritable(Writable.class, writables);
   }
 
   @Override
   public ArrayWritable mergeWithEngineRecord(HoodieSchema schema,
                                              Map<Integer, Object> updateValues,
                                              BufferedRecord<ArrayWritable> 
baseRecord) {
     Writable[] engineRecord = baseRecord.getRecord().get();
+    Schema avroSchema = schema.toAvroSchema();
+    List<Schema.Field> fields = avroSchema.getFields();
     for (Map.Entry<Integer, Object> value : updateValues.entrySet()) {
-      engineRecord[value.getKey()] = (Writable) value.getValue();
+      int pos = value.getKey();
+      Object updateValue = value.getValue();
+
+      // If value is already a Writable, use it directly
+      if (updateValue instanceof Writable) {

Review Comment:
   do we have test coverage for this



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class to assist with merging two versions of the record that may contain 
partial updates using
+ * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode.
+ */

Review Comment:
   lets file a follow up ticket. 
   Looks like in this patch, we are mainly focusing on MOR partial encoding. 
   But I am thinking we should expand the functionality to COW as well. 
   
   for eg, in spark data source writes, users should be able to supply only a 
subset of columns, and able to partial update the table. 
   we can take it as a follow up patch. 



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java:
##########
@@ -148,7 +148,17 @@ public HoodieRecord<RowData> 
constructHoodieRecord(BufferedRecord<RowData> buffe
 
   @Override
   public RowData constructEngineRecord(HoodieSchema recordSchema, Object[] 
fieldValues) {
-    return GenericRowData.of(fieldValues);
+    List<HoodieSchemaField> fields = recordSchema.getFields();

Review Comment:
   yes, I also feel this is too heavy since we are doing this for every record. 
We got to be cautious in adding any per record validation in general. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java:
##########
@@ -77,6 +77,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, 
Option<KeySpec> keySpecO
       // When a data block contains partial updates, subsequent record merging 
must always use
       // partial merging.
       enablePartialMerging = true;
+      if (partialUpdateModeOpt.isEmpty()) {
+        this.partialUpdateModeOpt = Option.of(PartialUpdateMode.KEEP_VALUES);

Review Comment:
   since we are standardizing partialUpdateModeOpt, why do we need 
`enablePartialMerging` anymore



##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java:
##########
@@ -22,10 +22,14 @@
 import org.apache.hudi.common.config.EnumFieldDescription;
 
 public enum PartialUpdateMode {
+  @EnumFieldDescription(
+      "For any column values missing in current record, pick value from 
previous version of the record.")

Review Comment:
   we should call out the nested fields as well



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala:
##########
@@ -680,6 +689,59 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     )
   }
 
+  test("Test MERGE INTO with KEEP_VALUES partial update mode on MOR table with 
Avro log format") {

Review Comment:
   If we have covered diff combinations either with TestPartialUpdateHandler or 
TestBufferedRecordMerger or else where, we don't need to do end to end 
functional. 
   
   Its worth having 1 or 2 end to end functionally, but not for all 
combinations. 
   For eg,
   Event time ordering based merge mode : 
   - MOR, partial encoding and no partial update mode set in table config. 
   - MOR, partial encoding + partial update mode set in table config. 
   
   these 2 should suffice for end to end functionally. 
   but lets ensure all combinations are tested at lower levels
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -63,28 +63,18 @@ public static <T> BufferedRecordMerger<T> 
create(HoodieReaderContext<T> readerCo
                                                    Option<Pair<String, 
String>> payloadClasses,
                                                    TypedProperties props,
                                                    Option<PartialUpdateMode> 
partialUpdateModeOpt) {
-    /**
-     * This part implements KEEP_VALUES partial update mode, which merges two 
records that do not have all columns.
-     * Other Partial update modes, like IGNORE_DEFAULTS assume all columns 
exists in the record,
-     * but some columns contain specific values that should be replaced by 
that from older version of the record.
-     */
-    if (enablePartialMerging) {
-      BufferedRecordMerger<T> deleteRecordMerger = create(
-          readerContext, recordMergeMode, false, recordMerger, readerSchema, 
payloadClasses, props, Option.empty());
-      return new 
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, deleteRecordMerger, readerSchema, props);
-    }
 
     switch (recordMergeMode) {
       case COMMIT_TIME_ORDERING:
         if (partialUpdateModeOpt.isEmpty()) {
           return new CommitTimeRecordMerger<>();
         }
-        return new 
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), props);
+        return new 
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), readerSchema, props);
       case EVENT_TIME_ORDERING:
         if (partialUpdateModeOpt.isEmpty()) {
           return new EventTimeRecordMerger<>(readerContext.getRecordContext());
         }
-        return new 
EventTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), props);
+        return new 
EventTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), readerSchema, props);

Review Comment:
   @yihua : ping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to