nsivabalan commented on code in PR #13498:
URL: https://github.com/apache/hudi/pull/13498#discussion_r2196323251


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+
+public class PartialUpdateStrategy<T> {
+  private final HoodieReaderContext<T> readerContext;
+  private final PartialUpdateMode partialUpdateMode;
+  private Map<String, String> partialUpdateProperties;
+
+  public PartialUpdateStrategy(HoodieReaderContext<T> readerContext,
+                               PartialUpdateMode partialUpdateMode,
+                               TypedProperties props) {
+    this.readerContext = readerContext;
+    this.partialUpdateMode = partialUpdateMode;
+    this.partialUpdateProperties = parsePartialUpdateProperties(props);
+  }
+
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time 
if COMMIT_TIME_ORDERING mode is used,
+   * or higher event time if EVENT_TIME_ORDERING mode us used.
+   */
+  BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord,
+                                                 BufferedRecord<T> oldRecord,
+                                                 Schema newSchema,
+                                                 Schema oldSchema,
+                                                 boolean 
keepOldMetadataColumns) {
+    // Note that: When either newRecord or oldRecord is a delete record,
+    //            skip partial update since delete records do not have 
meaningful columns.
+    if (partialUpdateMode == PartialUpdateMode.NONE
+        || null == oldRecord
+        || newRecord.isDelete()
+        || oldRecord.isDelete()) {
+      return newRecord;
+    }
+
+    switch (partialUpdateMode) {
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+        return newRecord;
+      case IGNORE_DEFAULTS:
+        return reconcileDefaultValues(
+            newRecord, oldRecord, newSchema, oldSchema, 
keepOldMetadataColumns);
+      case IGNORE_MARKERS:
+        return reconcileMarkerValues(
+            newRecord, oldRecord, newSchema, oldSchema);
+      default:
+        return newRecord;
+    }
+  }
+
+  /**
+   * @param newRecord              The newer record determined by the merge 
mode.
+   * @param oldRecord              The older record determined by the merge 
mode.
+   * @param newSchema              The schema of the newer record.
+   * @param oldSchema              The schema of the older record.
+   * @param keepOldMetadataColumns Keep the metadata columns from the older 
record.
+   * @return
+   */
+  BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord,
+                                           BufferedRecord<T> oldRecord,
+                                           Schema newSchema,
+                                           Schema oldSchema,
+                                           boolean keepOldMetadataColumns) {
+    List<Schema.Field> fields = newSchema.getFields();
+    Map<Integer, Object> updateValues = new HashMap<>();
+    T engineRecord;
+    // The default value only from the top-level data type is validated. That 
means,
+    // for nested columns, we do not check the leaf level data type defaults.
+    for (Schema.Field field : fields) {
+      String fieldName = field.name();
+      Object defaultValue = field.defaultVal();
+      Object newValue = readerContext.getValue(
+          newRecord.getRecord(), newSchema, fieldName);
+      if (defaultValue == newValue
+          || (keepOldMetadataColumns && 
HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName))) {
+        updateValues.put(field.pos(), 
readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName));
+      }
+    }
+    if (updateValues.isEmpty()) {
+      return newRecord;
+    }
+    engineRecord = readerContext.constructEngineRecord(newSchema, 
updateValues, newRecord);
+    return new BufferedRecord<>(
+        newRecord.getRecordKey(),
+        newRecord.getOrderingValue(),
+        engineRecord,
+        newRecord.getSchemaId(),
+        newRecord.isDelete());
+  }
+
+  BufferedRecord<T> reconcileMarkerValues(BufferedRecord<T> newRecord,
+                                          BufferedRecord<T> oldRecord,
+                                          Schema newSchema,
+                                          Schema oldSchema) {
+    List<Schema.Field> fields = newSchema.getFields();
+    Map<Integer, Object> updateValues = new HashMap<>();
+    T engineRecord;
+    String partialUpdateCustomMarker = 
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+    for (Schema.Field field : fields) {
+      String fieldName = field.name();
+      Object newValue = readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName);
+      if ((isStringTyped(field) || isBytesTyped(field))
+          && 
(partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue))))
 {
+        updateValues.put(field.pos(), 
readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName));
+      }
+    }
+    if (updateValues.isEmpty()) {
+      return newRecord;
+    }
+    engineRecord = readerContext.constructEngineRecord(newSchema, 
updateValues, newRecord);
+    return new BufferedRecord<>(
+        newRecord.getRecordKey(),
+        newRecord.getOrderingValue(),
+        engineRecord,
+        newRecord.getSchemaId(),
+        newRecord.isDelete());
+  }
+
+  static boolean isStringTyped(Schema.Field field) {
+    return hasTargetType(field.schema(), Schema.Type.STRING);
+  }
+
+  static boolean isBytesTyped(Schema.Field field) {
+    return hasTargetType(field.schema(), Schema.Type.BYTES);
+  }
+
+  static boolean hasTargetType(Schema schema, Schema.Type targetType) {
+    if (schema.getType() == targetType) {
+      return true;
+    } else if (schema.getType() == Schema.Type.UNION) {
+      // Stream is lazy, so this is efficient even with multiple types
+      return schema.getTypes().stream().anyMatch(s -> s.getType() == 
targetType);
+    }
+    return false;
+  }
+
+  static Map<String, String> parsePartialUpdateProperties(TypedProperties 
props) {
+    Map<String, String> properties = new HashMap<>();
+    String raw = 
props.getProperty(HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES.key());
+    if (StringUtils.isNullOrEmpty(raw)) {
+      return properties;
+    }
+    String[] entries = raw.split(",");

Review Comment:
   don't we already have a util method for this.  



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -45,27 +46,34 @@
  * Factory to create a {@link BufferedRecordMerger}.
  */
 public class BufferedRecordMergerFactory {
+
+  private BufferedRecordMergerFactory() {
+  }
+
   public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> 
readerContext,
                                                    RecordMergeMode 
recordMergeMode,
                                                    boolean 
enablePartialMerging,
                                                    Option<HoodieRecordMerger> 
recordMerger,
                                                    Option<String> 
orderingFieldName,
                                                    Option<String> payloadClass,
                                                    Schema readerSchema,
-                                                   TypedProperties props) {
+                                                   TypedProperties props,
+                                                   PartialUpdateMode 
partialUpdateMode) {
     if (enablePartialMerging) {
       BufferedRecordMerger<T> deleteRecordMerger = create(
-          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldName, payloadClass, readerSchema, props);
+          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldName, payloadClass, readerSchema, props, partialUpdateMode);

Review Comment:
   btw, is your plan to change this to using KEEP_VALUES once you have the impl 
ready ? or are we not looking to fix SQL MIT in 1.1 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -1071,6 +1108,24 @@ public Set<String> getMetadataPartitions() {
             CONFIG_VALUES_DELIMITER));
   }
 
+  public PartialUpdateMode getPartialUpdateMode() {
+    if (getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE)) {
+      String payloadClass = getPayloadClass();
+      if (StringUtils.isNullOrEmpty(payloadClass)) {
+        return 
PartialUpdateMode.valueOf(getStringOrDefault(PARTIAL_UPDATE_MODE));
+      } else if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())

Review Comment:
   during upgrade, don't we be setting the `PARTIAL_UPDATE_MODE`. and so 
shouldn't the logic be as follows
   
   ```
   if (tbl v > 9 ){
     return PartialUpdateMode.valueOf(getStringOrDefault(PARTIAL_UPDATE_MODE));
   } else {
    return PartialUpdateMode.NONE;
   }
   ```
   or am I missing something. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -45,27 +46,34 @@
  * Factory to create a {@link BufferedRecordMerger}.
  */
 public class BufferedRecordMergerFactory {
+
+  private BufferedRecordMergerFactory() {
+  }
+
   public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> 
readerContext,
                                                    RecordMergeMode 
recordMergeMode,
                                                    boolean 
enablePartialMerging,
                                                    Option<HoodieRecordMerger> 
recordMerger,
                                                    Option<String> 
orderingFieldName,
                                                    Option<String> payloadClass,
                                                    Schema readerSchema,
-                                                   TypedProperties props) {
+                                                   TypedProperties props,
+                                                   PartialUpdateMode 
partialUpdateMode) {
     if (enablePartialMerging) {
       BufferedRecordMerger<T> deleteRecordMerger = create(
-          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldName, payloadClass, readerSchema, props);
+          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldName, payloadClass, readerSchema, props, partialUpdateMode);
       return new PartialUpdateBufferedRecordMerger<>(readerContext, 
recordMerger, deleteRecordMerger, readerSchema, props);
     }
+
     switch (recordMergeMode) {
       case COMMIT_TIME_ORDERING:
-        return new CommitTimeBufferedRecordMerger<>();
+        return new CommitTimeBufferedRecordMerger<>(readerContext, 
partialUpdateMode, props, readerSchema);

Review Comment:
   yes, agree. 
   then we can keep PartialUpdateStrategy out of 
CommitTimeBufferedRecordMerger, and only add it to 
CommitTimeBufferedRecordPartialUpdateMerger 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -98,12 +122,28 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> 
olderRecord, BufferedRecord
    * based on {@code EVENT_TIME_ORDERING} merge mode.
    */
   private static class EventTimeBufferedRecordMerger<T> implements 
BufferedRecordMerger<T> {
+    private final PartialUpdateStrategy<T> partialUpdateStrategy;
+    private final Schema readerSchema;
+
+    public EventTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext,
+                                         PartialUpdateMode partialUpdateMode,
+                                         TypedProperties props,
+                                         Schema readerSchema) {
+      this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, 
partialUpdateMode, props);
+      this.readerSchema = readerSchema;
+    }
+
     @Override
     public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, 
BufferedRecord<T> existingRecord) {
       if (existingRecord == null || shouldKeepNewerRecord(existingRecord, 
newRecord)) {
+        newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(
+            newRecord, existingRecord, readerSchema, readerSchema, false);
         return Option.of(newRecord);
+      } else {
+        existingRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(

Review Comment:
   lets be very cautious to not make any changes to existing full record merge 
logic 



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextTypeHandler.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.engine;
+
+/**
+ * Helper class that handle cell level operation.
+ */
+public class ReaderContextTypeHandler {

Review Comment:
   +1



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -280,7 +286,8 @@ public ClosableIterator<T> getFileRecordIterator(
   public void initRecordMerger(TypedProperties properties) {
     RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
     String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
-    if 
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+    if 
(tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE)
+        || tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) 
{

Review Comment:
   not sure I get you Danny. 
   previously, we were inferring just for tbl v 6. and w/ our payload to merge 
mode migration, we have to infer for tbl v 8 as well. 
   
   anyways, I guess what lin has coded above is wrong. 
   we need to do inference for tables w/ versios <= 8



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -312,6 +318,21 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
 
+  public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE = 
ConfigProperty
+      .key("hoodie.write.partial.update.mode")
+      .defaultValue(PartialUpdateMode.NONE)
+      .sinceVersion("1.1.0")
+      .withDocumentation("This property when set, will define how two versions 
of the record will be "
+          + "merged together where the later contains only partial set of 
values and not entire record.");
+
+  public static final ConfigProperty<String> PARTIAL_UPDATE_PROPERTIES = 
ConfigProperty
+      .key("hoodie.write.partial.update.properties")
+      .noDefaultValue()

Review Comment:
   why have "partial" in the naming.
   we are looking to leverge this even for full record merges right.  for eg, 
incase of AWSDmsAvroPayload, we will be setting some custom delete marker 
configs. 
   
   So, how about`hoodie.table.merge.properties` ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -89,6 +111,12 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord 
deleteRecord, BufferedRecord
 
     @Override
     public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) {
+      if (newerRecord == null) {
+        return Pair.of(olderRecord.isDelete(), olderRecord.getRecord());
+      }
+
+      newerRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(

Review Comment:
   why not call it `partialUpdateStrategy.partialMerge()`
   this will be inline w/ other methods we have named like  deltaMerge(), 
finalMerge() and now partialMerge() 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -98,12 +122,28 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> 
olderRecord, BufferedRecord
    * based on {@code EVENT_TIME_ORDERING} merge mode.
    */
   private static class EventTimeBufferedRecordMerger<T> implements 
BufferedRecordMerger<T> {
+    private final PartialUpdateStrategy<T> partialUpdateStrategy;
+    private final Schema readerSchema;
+
+    public EventTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext,
+                                         PartialUpdateMode partialUpdateMode,
+                                         TypedProperties props,
+                                         Schema readerSchema) {
+      this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, 
partialUpdateMode, props);
+      this.readerSchema = readerSchema;
+    }
+
     @Override
     public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, 
BufferedRecord<T> existingRecord) {
       if (existingRecord == null || shouldKeepNewerRecord(existingRecord, 
newRecord)) {
+        newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(
+            newRecord, existingRecord, readerSchema, readerSchema, false);
         return Option.of(newRecord);
+      } else {
+        existingRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(

Review Comment:
   lets add a comment here that we are swapping the old and new due to lower 
ordering value before calling 
partialUpdateStrategy.reconcileFieldsWithOldRecord() in this `else` block



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -77,8 +85,22 @@ public static <T> BufferedRecordMerger<T> 
create(HoodieReaderContext<T> readerCo
    * based on {@code COMMIT_TIME_ORDERING} merge mode.
    */
   private static class CommitTimeBufferedRecordMerger<T> implements 
BufferedRecordMerger<T> {
+    private final PartialUpdateStrategy<T> partialUpdateStrategy;
+    private final Schema readerSchema;
+
+    public CommitTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext,
+                                          PartialUpdateMode partialUpdateMode,
+                                          TypedProperties props,
+                                          Schema readerSchema) {
+      this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, 
partialUpdateMode, props);
+      this.readerSchema = readerSchema;
+    }
+
     @Override
-    public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, 
BufferedRecord<T> existingRecord) {
+    public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord,
+                                                BufferedRecord<T> 
existingRecord) {
+      newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(

Review Comment:
   lets not touch exiting record mergers as proposed above. 
   we can add new ones for partial updates



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -312,6 +318,21 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
 
+  public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE = 
ConfigProperty
+      .key("hoodie.write.partial.update.mode")

Review Comment:
   yes, lets name this `hoodie.table.partial.update.mode` since its a table 
property



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+
+public class PartialUpdateStrategy<T> {

Review Comment:
   java docs please



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+
+public class PartialUpdateStrategy<T> {
+  private final HoodieReaderContext<T> readerContext;
+  private final PartialUpdateMode partialUpdateMode;
+  private Map<String, String> partialUpdateProperties;
+
+  public PartialUpdateStrategy(HoodieReaderContext<T> readerContext,
+                               PartialUpdateMode partialUpdateMode,
+                               TypedProperties props) {
+    this.readerContext = readerContext;
+    this.partialUpdateMode = partialUpdateMode;
+    this.partialUpdateProperties = parsePartialUpdateProperties(props);
+  }
+
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time 
if COMMIT_TIME_ORDERING mode is used,
+   * or higher event time if EVENT_TIME_ORDERING mode us used.
+   */
+  BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord,
+                                                 BufferedRecord<T> oldRecord,
+                                                 Schema newSchema,
+                                                 Schema oldSchema,
+                                                 boolean 
keepOldMetadataColumns) {
+    // Note that: When either newRecord or oldRecord is a delete record,
+    //            skip partial update since delete records do not have 
meaningful columns.
+    if (partialUpdateMode == PartialUpdateMode.NONE
+        || null == oldRecord
+        || newRecord.isDelete()
+        || oldRecord.isDelete()) {
+      return newRecord;
+    }
+
+    switch (partialUpdateMode) {
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+        return newRecord;
+      case IGNORE_DEFAULTS:
+        return reconcileDefaultValues(
+            newRecord, oldRecord, newSchema, oldSchema, 
keepOldMetadataColumns);
+      case IGNORE_MARKERS:
+        return reconcileMarkerValues(
+            newRecord, oldRecord, newSchema, oldSchema);
+      default:
+        return newRecord;
+    }
+  }
+
+  /**
+   * @param newRecord              The newer record determined by the merge 
mode.
+   * @param oldRecord              The older record determined by the merge 
mode.
+   * @param newSchema              The schema of the newer record.
+   * @param oldSchema              The schema of the older record.
+   * @param keepOldMetadataColumns Keep the metadata columns from the older 
record.
+   * @return
+   */
+  BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord,
+                                           BufferedRecord<T> oldRecord,
+                                           Schema newSchema,
+                                           Schema oldSchema,
+                                           boolean keepOldMetadataColumns) {
+    List<Schema.Field> fields = newSchema.getFields();

Review Comment:
   We need to fix HoodieMergeHandle for COW to also use FG reader. thats the 
next work item either of Lin or Lokesh will be working on :) 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -45,27 +46,34 @@
  * Factory to create a {@link BufferedRecordMerger}.
  */
 public class BufferedRecordMergerFactory {
+
+  private BufferedRecordMergerFactory() {
+  }
+
   public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> 
readerContext,
                                                    RecordMergeMode 
recordMergeMode,
                                                    boolean 
enablePartialMerging,
                                                    Option<HoodieRecordMerger> 
recordMerger,
                                                    Option<String> 
orderingFieldName,
                                                    Option<String> payloadClass,
                                                    Schema readerSchema,
-                                                   TypedProperties props) {
+                                                   TypedProperties props,
+                                                   PartialUpdateMode 
partialUpdateMode) {
     if (enablePartialMerging) {
       BufferedRecordMerger<T> deleteRecordMerger = create(
-          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldName, payloadClass, readerSchema, props);
+          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldName, payloadClass, readerSchema, props, partialUpdateMode);

Review Comment:
   can we add java docs here as to how is this code block different from the 
partialUpdateMode used below in L 70, 72 etc. 
   
   don't think its apparent 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+
+public class PartialUpdateStrategy<T> {
+  private final HoodieReaderContext<T> readerContext;
+  private final PartialUpdateMode partialUpdateMode;
+  private Map<String, String> partialUpdateProperties;
+
+  public PartialUpdateStrategy(HoodieReaderContext<T> readerContext,
+                               PartialUpdateMode partialUpdateMode,
+                               TypedProperties props) {
+    this.readerContext = readerContext;
+    this.partialUpdateMode = partialUpdateMode;
+    this.partialUpdateProperties = parsePartialUpdateProperties(props);
+  }
+
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time 
if COMMIT_TIME_ORDERING mode is used,
+   * or higher event time if EVENT_TIME_ORDERING mode us used.
+   */
+  BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord,
+                                                 BufferedRecord<T> oldRecord,
+                                                 Schema newSchema,
+                                                 Schema oldSchema,
+                                                 boolean 
keepOldMetadataColumns) {
+    // Note that: When either newRecord or oldRecord is a delete record,
+    //            skip partial update since delete records do not have 
meaningful columns.
+    if (partialUpdateMode == PartialUpdateMode.NONE
+        || null == oldRecord
+        || newRecord.isDelete()
+        || oldRecord.isDelete()) {
+      return newRecord;
+    }
+
+    switch (partialUpdateMode) {
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+        return newRecord;
+      case IGNORE_DEFAULTS:
+        return reconcileDefaultValues(
+            newRecord, oldRecord, newSchema, oldSchema, 
keepOldMetadataColumns);
+      case IGNORE_MARKERS:
+        return reconcileMarkerValues(
+            newRecord, oldRecord, newSchema, oldSchema);
+      default:
+        return newRecord;
+    }
+  }
+
+  /**
+   * @param newRecord              The newer record determined by the merge 
mode.
+   * @param oldRecord              The older record determined by the merge 
mode.
+   * @param newSchema              The schema of the newer record.
+   * @param oldSchema              The schema of the older record.
+   * @param keepOldMetadataColumns Keep the metadata columns from the older 
record.
+   * @return
+   */
+  BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord,
+                                           BufferedRecord<T> oldRecord,
+                                           Schema newSchema,
+                                           Schema oldSchema,
+                                           boolean keepOldMetadataColumns) {
+    List<Schema.Field> fields = newSchema.getFields();
+    Map<Integer, Object> updateValues = new HashMap<>();
+    T engineRecord;
+    // The default value only from the top-level data type is validated. That 
means,
+    // for nested columns, we do not check the leaf level data type defaults.
+    for (Schema.Field field : fields) {
+      String fieldName = field.name();
+      Object defaultValue = field.defaultVal();
+      Object newValue = readerContext.getValue(
+          newRecord.getRecord(), newSchema, fieldName);
+      if (defaultValue == newValue
+          || (keepOldMetadataColumns && 
HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName))) {
+        updateValues.put(field.pos(), 
readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName));
+      }
+    }
+    if (updateValues.isEmpty()) {
+      return newRecord;
+    }
+    engineRecord = readerContext.constructEngineRecord(newSchema, 
updateValues, newRecord);
+    return new BufferedRecord<>(
+        newRecord.getRecordKey(),
+        newRecord.getOrderingValue(),
+        engineRecord,
+        newRecord.getSchemaId(),
+        newRecord.isDelete());
+  }
+
+  BufferedRecord<T> reconcileMarkerValues(BufferedRecord<T> newRecord,
+                                          BufferedRecord<T> oldRecord,
+                                          Schema newSchema,
+                                          Schema oldSchema) {
+    List<Schema.Field> fields = newSchema.getFields();
+    Map<Integer, Object> updateValues = new HashMap<>();
+    T engineRecord;
+    String partialUpdateCustomMarker = 
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+    for (Schema.Field field : fields) {
+      String fieldName = field.name();
+      Object newValue = readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName);
+      if ((isStringTyped(field) || isBytesTyped(field))
+          && 
(partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue))))
 {
+        updateValues.put(field.pos(), 
readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName));
+      }
+    }
+    if (updateValues.isEmpty()) {
+      return newRecord;
+    }
+    engineRecord = readerContext.constructEngineRecord(newSchema, 
updateValues, newRecord);
+    return new BufferedRecord<>(
+        newRecord.getRecordKey(),
+        newRecord.getOrderingValue(),
+        engineRecord,
+        newRecord.getSchemaId(),
+        newRecord.isDelete());
+  }
+
+  static boolean isStringTyped(Schema.Field field) {
+    return hasTargetType(field.schema(), Schema.Type.STRING);
+  }
+
+  static boolean isBytesTyped(Schema.Field field) {
+    return hasTargetType(field.schema(), Schema.Type.BYTES);
+  }
+
+  static boolean hasTargetType(Schema schema, Schema.Type targetType) {
+    if (schema.getType() == targetType) {
+      return true;
+    } else if (schema.getType() == Schema.Type.UNION) {
+      // Stream is lazy, so this is efficient even with multiple types
+      return schema.getTypes().stream().anyMatch(s -> s.getType() == 
targetType);
+    }
+    return false;
+  }
+
+  static Map<String, String> parsePartialUpdateProperties(TypedProperties 
props) {
+    Map<String, String> properties = new HashMap<>();
+    String raw = 
props.getProperty(HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES.key());
+    if (StringUtils.isNullOrEmpty(raw)) {
+      return properties;
+    }
+    String[] entries = raw.split(",");

Review Comment:
   
https://github.com/apache/hudi/blob/aa3dcd83fe660daff4d061be7459ddf02d038696/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java#L243
 
   
   can we try to move this to a util class and re-use 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -116,12 +160,18 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> 
olderRecord, BufferedRecord
       if (newerRecord.isCommitTimeOrderingDelete()) {
         return Pair.of(true, newerRecord.getRecord());
       }
+
       Comparable newOrderingValue = newerRecord.getOrderingValue();
       Comparable oldOrderingValue = olderRecord.getOrderingValue();
       if (!olderRecord.isCommitTimeOrderingDelete()
           && oldOrderingValue.compareTo(newOrderingValue) > 0) {
+        olderRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(
+            olderRecord, newerRecord, readerSchema, readerSchema, true);

Review Comment:
   same comment as about about schema per record 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -110,6 +112,23 @@ public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRo
     return new HoodieSparkRecord(hoodieKey, row, 
HoodieInternalRowUtils.getCachedSchema(schema), false);
   }
 
+  @Override
+  public InternalRow constructEngineRecord(Schema schema,
+                                           Map<Integer, Object> updateValues,
+                                           BufferedRecord<InternalRow> 
baseRecord) {
+    List<Schema.Field> fields = schema.getFields();
+    Object[] values = new Object[fields.size()];
+    for (Schema.Field field : fields) {
+      int pos = field.pos();
+      if (updateValues.containsKey(pos)) {
+        values[pos] = updateValues.get(pos);
+      } else {
+        values[pos] = getValue(baseRecord.getRecord(), schema, field.name());
+      }
+    }
+    return toBinaryRow(schema, new GenericInternalRow(values));

Review Comment:
   +1 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -98,12 +126,28 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> 
olderRecord, BufferedRecord
    * based on {@code EVENT_TIME_ORDERING} merge mode.
    */
   private static class EventTimeBufferedRecordMerger<T> implements 
BufferedRecordMerger<T> {
+    private final PartialUpdateStrategy<T> partialUpdateStrategy;
+    private final Schema readerSchema;
+
+    public EventTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext,
+                                         PartialUpdateMode partialUpdateMode,
+                                         TypedProperties props,
+                                         Schema readerSchema) {
+      this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, 
partialUpdateMode, props);
+      this.readerSchema = readerSchema;
+    }
+
     @Override
     public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, 
BufferedRecord<T> existingRecord) {
       if (existingRecord == null || shouldKeepNewerRecord(existingRecord, 
newRecord)) {
+        newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(
+            newRecord, existingRecord, readerSchema, readerSchema, false);
         return Option.of(newRecord);
+      } else {
+        existingRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord(
+            existingRecord, newRecord, readerSchema, readerSchema, true);

Review Comment:
   how come we are passing in `readerSchema` for both old and new. 
   incase of partial merges, (even before this patch), won't we fetch the 
schema at per record level 
   
   ref:
   
https://github.com/apache/hudi/blob/aa3dcd83fe660daff4d061be7459ddf02d038696/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java#L165
 
   
   Can we ensure we don't cause any regression to any of existing code



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+
+public class PartialUpdateStrategy<T> {
+  private final HoodieReaderContext<T> readerContext;
+  private final PartialUpdateMode partialUpdateMode;
+  private Map<String, String> partialUpdateProperties;
+
+  public PartialUpdateStrategy(HoodieReaderContext<T> readerContext,
+                               PartialUpdateMode partialUpdateMode,
+                               TypedProperties props) {
+    this.readerContext = readerContext;
+    this.partialUpdateMode = partialUpdateMode;
+    this.partialUpdateProperties = parsePartialUpdateProperties(props);
+  }
+
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time 
if COMMIT_TIME_ORDERING mode is used,
+   * or higher event time if EVENT_TIME_ORDERING mode us used.
+   */
+  BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord,
+                                                 BufferedRecord<T> oldRecord,
+                                                 Schema newSchema,
+                                                 Schema oldSchema,
+                                                 boolean 
keepOldMetadataColumns) {
+    // Note that: When either newRecord or oldRecord is a delete record,
+    //            skip partial update since delete records do not have 
meaningful columns.
+    if (partialUpdateMode == PartialUpdateMode.NONE
+        || null == oldRecord
+        || newRecord.isDelete()
+        || oldRecord.isDelete()) {
+      return newRecord;
+    }
+
+    switch (partialUpdateMode) {
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+        return newRecord;

Review Comment:
   guess, lin is fixing KEEP_VALUES in a follow up PR 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+
+public class PartialUpdateStrategy<T> {
+  private final HoodieReaderContext<T> readerContext;
+  private final PartialUpdateMode partialUpdateMode;
+  private Map<String, String> partialUpdateProperties;
+
+  public PartialUpdateStrategy(HoodieReaderContext<T> readerContext,
+                               PartialUpdateMode partialUpdateMode,
+                               TypedProperties props) {
+    this.readerContext = readerContext;
+    this.partialUpdateMode = partialUpdateMode;
+    this.partialUpdateProperties = parsePartialUpdateProperties(props);
+  }
+
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time 
if COMMIT_TIME_ORDERING mode is used,
+   * or higher event time if EVENT_TIME_ORDERING mode us used.
+   */
+  BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord,
+                                                 BufferedRecord<T> oldRecord,
+                                                 Schema newSchema,
+                                                 Schema oldSchema,
+                                                 boolean 
keepOldMetadataColumns) {
+    // Note that: When either newRecord or oldRecord is a delete record,
+    //            skip partial update since delete records do not have 
meaningful columns.
+    if (partialUpdateMode == PartialUpdateMode.NONE
+        || null == oldRecord
+        || newRecord.isDelete()
+        || oldRecord.isDelete()) {
+      return newRecord;
+    }
+
+    switch (partialUpdateMode) {
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+        return newRecord;
+      case IGNORE_DEFAULTS:
+        return reconcileDefaultValues(
+            newRecord, oldRecord, newSchema, oldSchema, 
keepOldMetadataColumns);
+      case IGNORE_MARKERS:
+        return reconcileMarkerValues(
+            newRecord, oldRecord, newSchema, oldSchema);
+      default:
+        return newRecord;
+    }
+  }
+
+  /**
+   * @param newRecord              The newer record determined by the merge 
mode.
+   * @param oldRecord              The older record determined by the merge 
mode.
+   * @param newSchema              The schema of the newer record.
+   * @param oldSchema              The schema of the older record.
+   * @param keepOldMetadataColumns Keep the metadata columns from the older 
record.
+   * @return
+   */
+  BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord,
+                                           BufferedRecord<T> oldRecord,
+                                           Schema newSchema,
+                                           Schema oldSchema,
+                                           boolean keepOldMetadataColumns) {
+    List<Schema.Field> fields = newSchema.getFields();
+    Map<Integer, Object> updateValues = new HashMap<>();
+    T engineRecord;
+    // The default value only from the top-level data type is validated. That 
means,
+    // for nested columns, we do not check the leaf level data type defaults.
+    for (Schema.Field field : fields) {
+      String fieldName = field.name();
+      Object defaultValue = field.defaultVal();
+      Object newValue = readerContext.getValue(
+          newRecord.getRecord(), newSchema, fieldName);
+      if (defaultValue == newValue
+          || (keepOldMetadataColumns && 
HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName))) {
+        updateValues.put(field.pos(), 
readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName));
+      }
+    }
+    if (updateValues.isEmpty()) {
+      return newRecord;
+    }
+    engineRecord = readerContext.constructEngineRecord(newSchema, 
updateValues, newRecord);
+    return new BufferedRecord<>(
+        newRecord.getRecordKey(),
+        newRecord.getOrderingValue(),
+        engineRecord,
+        newRecord.getSchemaId(),
+        newRecord.isDelete());
+  }
+
+  BufferedRecord<T> reconcileMarkerValues(BufferedRecord<T> newRecord,
+                                          BufferedRecord<T> oldRecord,
+                                          Schema newSchema,
+                                          Schema oldSchema) {
+    List<Schema.Field> fields = newSchema.getFields();
+    Map<Integer, Object> updateValues = new HashMap<>();
+    T engineRecord;
+    String partialUpdateCustomMarker = 
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);

Review Comment:
   can we do this one time in the constructor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to