nsivabalan commented on code in PR #17604:
URL: https://github.com/apache/hudi/pull/17604#discussion_r2692971729


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -63,28 +63,18 @@ public static <T> BufferedRecordMerger<T> 
create(HoodieReaderContext<T> readerCo
                                                    Option<Pair<String, 
String>> payloadClasses,
                                                    TypedProperties props,
                                                    Option<PartialUpdateMode> 
partialUpdateModeOpt) {
-    /**
-     * This part implements KEEP_VALUES partial update mode, which merges two 
records that do not have all columns.
-     * Other Partial update modes, like IGNORE_DEFAULTS assume all columns 
exists in the record,
-     * but some columns contain specific values that should be replaced by 
that from older version of the record.
-     */
-    if (enablePartialMerging) {
-      BufferedRecordMerger<T> deleteRecordMerger = create(
-          readerContext, recordMergeMode, false, recordMerger, readerSchema, 
payloadClasses, props, Option.empty());
-      return new 
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, deleteRecordMerger, readerSchema, props);
-    }
 
     switch (recordMergeMode) {
       case COMMIT_TIME_ORDERING:
         if (partialUpdateModeOpt.isEmpty()) {
           return new CommitTimeRecordMerger<>();
         }
-        return new 
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), props);
+        return new 
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), readerSchema, props);
       case EVENT_TIME_ORDERING:
         if (partialUpdateModeOpt.isEmpty()) {
           return new EventTimeRecordMerger<>(readerContext.getRecordContext());
         }
-        return new 
EventTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), props);
+        return new 
EventTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), readerSchema, props);

Review Comment:
   hey @yihua : looks like partial encoding for MergeInto is only supported 
when either of merge mode (commit time or event time) is used. and not for 
custom mergers. Was that intentional? Or we wanted to add support down the 
line? 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Class to assist with merging two versions of the record that may contain 
partial updates using
+ * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode.
+ */
+public class KeepValuesPartialMergingUtils<T> {
+  static KeepValuesPartialMergingUtils INSTANCE = new 
KeepValuesPartialMergingUtils();
+  private static final Map<HoodieSchema, Map<String, Integer>>
+      FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map<Pair<Pair<HoodieSchema, HoodieSchema>, 
HoodieSchema>, HoodieSchema>
+      MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param lowOrderRecord  record with lower commit time or lower ordering 
value
+   * @param lowOrderSchema  The schema of the older record
+   * @param highOrderRecord record with higher commit time or higher ordering 
value
+   * @param highOrderSchema The schema of highOrderRecord
+   * @param newSchema       The schema of the new incoming record
+   * @return The merged record and schema.
+   */
+  Pair<BufferedRecord<T>, HoodieSchema> mergePartialRecords(BufferedRecord<T> 
lowOrderRecord,
+                                                             HoodieSchema 
lowOrderSchema,
+                                                             BufferedRecord<T> 
highOrderRecord,
+                                                             HoodieSchema 
highOrderSchema,
+                                                             HoodieSchema 
newSchema,
+                                                             RecordContext<T> 
recordContext) {
+    // The merged schema contains fields that only appear in either older 
and/or newer record.
+    HoodieSchema mergedSchema =
+        getCachedMergedSchema(lowOrderSchema, highOrderSchema, newSchema);
+    boolean isNewerPartial = isPartial(highOrderSchema, mergedSchema);
+    if (!isNewerPartial) {
+      return Pair.of(highOrderRecord, highOrderSchema);
+    }
+    Set<String> fieldNamesInNewRecord =
+        getCachedFieldNameToIdMapping(highOrderSchema).keySet();
+    // Collect field values.
+    List<HoodieSchemaField> fields = mergedSchema.getFields();
+    Object[] fieldVals = new Object[fields.size()];
+    int idx = 0;
+    List<HoodieSchemaField> mergedSchemaFields = mergedSchema.getFields();
+    for (HoodieSchemaField mergedSchemaField : mergedSchemaFields) {

Review Comment:
   can we create a follow up issue for nested fields. 
   we should add the support. just that the payloads which we migrated only 
supported top level, we tried to maintain parity. But should not block us from 
adding support for new users. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -365,6 +365,12 @@ public static final String getDefaultPayloadClassName() {
       .sinceVersion("1.1.0")
       .withDocumentation("This property when set, will define how two versions 
of the record will be merged together when records are partially formed");
 
+  public static final ConfigProperty<String> MERGE_PROPERTIES = ConfigProperty

Review Comment:
   We changed the logic towards this. 
   we have a common prefix for all merge properties. 
   you can refer to 
https://github.com/apache/hudi/blob/066887ae5b88989b6869c57a3c56137319bbd029/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java#L902
 for merge props 
   
   
   may be in the first version of this PR when raised, we had this property, 
but later we changed course. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to