nsivabalan commented on code in PR #13498:
URL: https://github.com/apache/hudi/pull/13498#discussion_r2181089525


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -164,6 +165,20 @@ public HoodieRecord<IndexedRecord> 
constructHoodieRecord(BufferedRecord<IndexedR
     return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
   }
 
+  @Override
+  public IndexedRecord constructEngineRecord(Schema schema, List<Object> 
values) {

Review Comment:
   do we have UTs for this 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -841,6 +841,13 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Whether to enable incremental table service. So far 
Clustering and Compaction support incremental processing.");
 
+  public static final ConfigProperty<Boolean> 
EVENT_TIME_WATERMARK_METADATA_ENABLED = ConfigProperty
+      .key("hoodie.write.event.time.watermark.metadata.enabled")
+      .defaultValue(true)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Record event time in record metadata during 
runtime.");

Review Comment:
   more documentation please



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -110,6 +112,14 @@ public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRo
     return new HoodieSparkRecord(hoodieKey, row, 
HoodieInternalRowUtils.getCachedSchema(schema), false);
   }
 
+  @Override
+  public InternalRow constructEngineRecord(Schema schema, List<Object> values) 
{
+    if (schema.getFields().size() != values.size()) {
+      throw new IllegalArgumentException("Schema field count and values size 
must match.");
+    }
+    return new GenericInternalRow(values.toArray());

Review Comment:
   We use UsafeRow in most places. 
   
https://github.com/apache/hudi/blob/e3f93425cd3b5d5fc07d1b39f6396ab403f9b76d/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java#L383
 
   
   @yihua : can you take a look please 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -304,6 +310,20 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
 
+  public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE = 
ConfigProperty
+      .key("hoodie.write.partial.update.mode")
+      .defaultValue(PartialUpdateMode.NONE)
+      .sinceVersion("1.1.0")
+      .withDocumentation("This property when set, will define how two versions 
of the record will be "
+          + "merged together where the later contains only partial set of 
values and not entire record.");
+
+  public static final ConfigProperty<String> PARTIAL_UPDATE_PROPERTIES = 
ConfigProperty
+      .key("hoodie.write.partial.update.properties")

Review Comment:
   can you update PR description to call this out. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -263,15 +297,20 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
 
         // If pre-combine returns existing record, no need to update it
         if (combinedRecord.getData() != existingRecord.getRecord()) {
-          return Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), readerContext, props));
+          return Option.of(BufferedRecord.forRecordWithContext(
+              combinedRecord, combinedRecordAndSchema.getRight(), 
readerContext, props));
         }
         return Option.empty();
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
+            newRecord = updatePartiallyIfNeeded(
+                newRecord, existingRecord, readerSchema, readerSchema, 
partialUpdateMode);
             return Option.of(newRecord);
           case EVENT_TIME_ORDERING:
             if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+              newRecord = updatePartiallyIfNeeded(
+                  newRecord, existingRecord, readerSchema, readerSchema, 
partialUpdateMode);

Review Comment:
   even if we are choosing older records for ordering field, don't we still 
need to merge w/ new incoming incase of partial updates? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -849,16 +869,25 @@ public static Triple<RecordMergeMode, String, String> 
inferCorrectMergingBehavio
     return Triple.of(inferredRecordMergeMode, inferredPayloadClassName, 
inferredRecordMergeStrategyId);
   }
 
-  public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String 
payloadClassName) {
+  public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String 
payloadClassName, HoodieTableVersion tableVersion) {
     if (isNullOrEmpty(payloadClassName)) {
       return null;
     }
+    // TODO: make this only for version > 8 after upgrade table version.
+    if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {

Review Comment:
   I would say this is unreachable code. 
   
   inferRecordMergeModeFromPayloadClass should never be invoked for table 
version 8 and either of event time ordering and commit time ordering.
   
   ``` 
       // Inferring record merge mode
       if (isNullOrEmpty(payloadClassName) && 
isNullOrEmpty(recordMergeStrategyId)) {
         // If nothing is set on record merge mode, payload class, or record 
merge strategy ID,
         // use the default merge mode determined by whether the ordering field 
name is set.
         inferredRecordMergeMode = recordMergeMode != null
             ? recordMergeMode
             : (isNullOrEmpty(orderingFieldName) ? COMMIT_TIME_ORDERING : 
EVENT_TIME_ORDERING);
       } else {
         // Infer the merge mode from either the payload class or record merge 
strategy ID
         RecordMergeMode modeBasedOnPayload = 
inferRecordMergeModeFromPayloadClass(payloadClassName);
   .
   .
   ```
   
   We need to account for case where after upgrade, we might have the payload 
property, but also merge mode property in the table config. 
   So, in case of table version 9 and above, 
   if both exists, we should only consider merge mode. 
   
   
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T> 
newRecord,
+                                                    BufferedRecord<T> 
oldRecord,
+                                                    Schema newSchema,
+                                                    Schema oldSchema,
+                                                    PartialUpdateMode 
partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return newRecord;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    List<Object> values = new ArrayList<>();
+    T engineRecord;
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        return newRecord;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object defaultValue = field.defaultVal();
+          Object newValue = readerContext.getValue(
+              newRecord.getRecord(), newSchema, fieldName);
+          if (defaultValue == newValue) {
+            values.add(readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName));
+          } else {
+            values.add(readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName));
+          }
+        }
+        engineRecord = readerContext.constructEngineRecord(newSchema, values);
+        return new BufferedRecord<>(

Review Comment:
   can we introduce private methods for each of the switch case if its more 
than 1 or 2 lines. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T> 
newRecord,
+                                                    BufferedRecord<T> 
oldRecord,
+                                                    Schema newSchema,
+                                                    Schema oldSchema,
+                                                    PartialUpdateMode 
partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return newRecord;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    List<Object> values = new ArrayList<>();
+    T engineRecord;
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        return newRecord;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object defaultValue = field.defaultVal();
+          Object newValue = readerContext.getValue(
+              newRecord.getRecord(), newSchema, fieldName);
+          if (defaultValue == newValue) {
+            values.add(readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName));
+          } else {
+            values.add(readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName));
+          }
+        }
+        engineRecord = readerContext.constructEngineRecord(newSchema, values);
+        return new BufferedRecord<>(
+            newRecord.getRecordKey(),
+            newRecord.getOrderingValue(),
+            engineRecord,
+            newRecord.getSchemaId(),
+            newRecord.isDelete());
+
+      case IGNORE_MARKERS:
+        String partialUpdateCustomMarker = 
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+        if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+          throw new IllegalStateException(
+              "For 'IGNORE_MARKERS' mode, custom marker '"
+                  + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+        }
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object newValue = readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName);
+          if ((isStringTyped(field) || isBytesTyped(field))

Review Comment:
   are we accounting for nested fields as well 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -849,16 +869,25 @@ public static Triple<RecordMergeMode, String, String> 
inferCorrectMergingBehavio
     return Triple.of(inferredRecordMergeMode, inferredPayloadClassName, 
inferredRecordMergeStrategyId);
   }
 
-  public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String 
payloadClassName) {
+  public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String 
payloadClassName, HoodieTableVersion tableVersion) {
     if (isNullOrEmpty(payloadClassName)) {
       return null;
     }
+    // TODO: make this only for version > 8 after upgrade table version.
+    if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {

Review Comment:
   also, isn't the version NINE instead of EIGHT ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -75,10 +80,12 @@
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 
 public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T> {
+  private static final String PARTIAL_UPDATE_CUSTOM_MARKER = 
"hoodie.write.partial.update.custom.marker";

Review Comment:
   we should define these are ConfigProperties. 
   for now, add it to HoodieWriteConfig. 
   @yihua @danny0405 : is it worth adding a new config class named 
HoodieRecordMergeConfig 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -141,6 +151,30 @@ protected FileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
         
readerContext.getSchemaHandler().getCustomDeleteMarkerKeyValue().isPresent();
     this.shouldCheckBuiltInDeleteMarker =
         readerContext.getSchemaHandler().hasBuiltInDelete();
+    this.partialUpdateProperties = parsePartialUpdateProperties(props);
+  }
+
+  public static Map<String, String> 
parsePartialUpdateProperties(TypedProperties props) {

Review Comment:
   why public static? 
   why not private 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T> 
newRecord,
+                                                    BufferedRecord<T> 
oldRecord,
+                                                    Schema newSchema,
+                                                    Schema oldSchema,
+                                                    PartialUpdateMode 
partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return newRecord;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    List<Object> values = new ArrayList<>();
+    T engineRecord;
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        return newRecord;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object defaultValue = field.defaultVal();
+          Object newValue = readerContext.getValue(
+              newRecord.getRecord(), newSchema, fieldName);
+          if (defaultValue == newValue) {
+            values.add(readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName));
+          } else {
+            values.add(readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName));
+          }
+        }
+        engineRecord = readerContext.constructEngineRecord(newSchema, values);
+        return new BufferedRecord<>(
+            newRecord.getRecordKey(),
+            newRecord.getOrderingValue(),
+            engineRecord,
+            newRecord.getSchemaId(),
+            newRecord.isDelete());
+
+      case IGNORE_MARKERS:
+        String partialUpdateCustomMarker = 
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+        if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {

Review Comment:
   why can't we do this validation once somewhere in the constructor and not 
per record. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T> 
newRecord,
+                                                    BufferedRecord<T> 
oldRecord,
+                                                    Schema newSchema,
+                                                    Schema oldSchema,
+                                                    PartialUpdateMode 
partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return newRecord;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    List<Object> values = new ArrayList<>();
+    T engineRecord;
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        return newRecord;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object defaultValue = field.defaultVal();
+          Object newValue = readerContext.getValue(
+              newRecord.getRecord(), newSchema, fieldName);
+          if (defaultValue == newValue) {
+            values.add(readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName));
+          } else {
+            values.add(readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName));
+          }
+        }
+        engineRecord = readerContext.constructEngineRecord(newSchema, values);
+        return new BufferedRecord<>(
+            newRecord.getRecordKey(),
+            newRecord.getOrderingValue(),
+            engineRecord,
+            newRecord.getSchemaId(),
+            newRecord.isDelete());
+
+      case IGNORE_MARKERS:
+        String partialUpdateCustomMarker = 
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+        if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+          throw new IllegalStateException(
+              "For 'IGNORE_MARKERS' mode, custom marker '"
+                  + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+        }
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object newValue = readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName);
+          if ((isStringTyped(field) || isBytesTyped(field))
+              && 
partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))
 {
+            values.add(readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName));
+          } else {
+            values.add(readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName));
+          }
+        }

Review Comment:
   if there are no markers, can we return the incoming record as is w/o 
constructing a new record altogether. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+public enum PartialUpdateMode {
+  @EnumFieldDescription(
+      "No partial update logic should be employed.")
+  NONE,
+
+  @EnumFieldDescription(
+      "For any column values missing in current record, pick value from 
previous version of the record.")
+  KEEP_VALUES,
+
+  @EnumFieldDescription(
+      "For columns values missing in current record, pick the default value 
from the schema.")
+  FILL_DEFAULTS,
+
+  @EnumFieldDescription(
+      "For columns having default values set in current record, pick the value 
from previous version of the record.")
+  IGNORE_DEFAULTS,
+
+  @EnumFieldDescription(
+      "For columns having marker values in the current record, pick value from 
previous version of the record."
+      + "Marker value can be defined using hoodie.write.partial.custom.marker, 
which is a table property.")

Review Comment:
   we should also call out how to set these props. its a writer property that 
one needs to set when creating the table for the first time. 
   during upgrades, it will automatically get added to the table config 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T> 
newRecord,
+                                                    BufferedRecord<T> 
oldRecord,
+                                                    Schema newSchema,
+                                                    Schema oldSchema,
+                                                    PartialUpdateMode 
partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return newRecord;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    List<Object> values = new ArrayList<>();
+    T engineRecord;
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        return newRecord;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object defaultValue = field.defaultVal();
+          Object newValue = readerContext.getValue(
+              newRecord.getRecord(), newSchema, fieldName);
+          if (defaultValue == newValue) {
+            values.add(readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName));
+          } else {
+            values.add(readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName));
+          }
+        }
+        engineRecord = readerContext.constructEngineRecord(newSchema, values);
+        return new BufferedRecord<>(
+            newRecord.getRecordKey(),
+            newRecord.getOrderingValue(),
+            engineRecord,
+            newRecord.getSchemaId(),
+            newRecord.isDelete());
+
+      case IGNORE_MARKERS:
+        String partialUpdateCustomMarker = 
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+        if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+          throw new IllegalStateException(
+              "For 'IGNORE_MARKERS' mode, custom marker '"
+                  + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+        }
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object newValue = readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName);
+          if ((isStringTyped(field) || isBytesTyped(field))
+              && 
partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))
 {
+            values.add(readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName));
+          } else {
+            values.add(readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName));
+          }
+        }
+        engineRecord = readerContext.constructEngineRecord(newSchema, values);
+        return new BufferedRecord<>(
+            newRecord.getRecordKey(),
+            newRecord.getOrderingValue(),
+            engineRecord,
+            newRecord.getSchemaId(),
+            newRecord.isDelete());
+
+      default:
+        return newRecord;
+    }
+  }
+
+  static boolean isStringTyped(Schema.Field field) {

Review Comment:
   why not private for these. 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -110,6 +112,14 @@ public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRo
     return new HoodieSparkRecord(hoodieKey, row, 
HoodieInternalRowUtils.getCachedSchema(schema), false);
   }
 
+  @Override
+  public InternalRow constructEngineRecord(Schema schema, List<Object> values) 
{
+    if (schema.getFields().size() != values.size()) {
+      throw new IllegalArgumentException("Schema field count and values size 
must match.");
+    }
+    return new GenericInternalRow(values.toArray());

Review Comment:
   @linliu-code : can you sync up w/ ethan and get an alignment here



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -446,6 +576,8 @@ protected Pair<Boolean, T> merge(BufferedRecord<T> 
olderRecord, BufferedRecord<T
     } else {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
+          newerRecord = updatePartiallyIfNeeded(

Review Comment:
   can we name this more generically.
   `mergeRecords` or something. 
   and internally, if partial update mode is NONE, lets return right away (1st 
line w/n the method) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to