This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 26ecb287684 [HUDI-6801] Implement merging partial updates from log 
files for MOR tables (#9883)
26ecb287684 is described below

commit 26ecb2876842fa2d6c79aa1a0e6f55277f25cba8
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Oct 30 23:26:30 2023 -0700

    [HUDI-6801] Implement merging partial updates from log files for MOR tables 
(#9883)
    
    This commit adds the logic of merging partial updates in the new file group 
reader with Spark record merger.
---
 .../org/apache/hudi/HoodieSparkRecordMerger.java   |  40 ++++
 .../hudi/common/model/HoodieSparkRecord.java       |  19 +-
 .../io/storage/HoodieSparkFileReaderFactory.java   |   2 +-
 .../hudi/io/storage/HoodieSparkParquetReader.java  |   2 +-
 .../apache/hudi/merge/SparkRecordMergingUtils.java | 234 +++++++++++++++++++++
 .../hudi/BaseSparkInternalRowReaderContext.java    |   4 +-
 .../SparkFileFormatInternalRowReaderContext.scala  |  52 ++++-
 .../hudi/util/CloseableInternalRowIterator.scala   |   1 +
 .../hudi/common/engine/HoodieReaderContext.java    |  15 +-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |   5 +-
 .../hudi/common/model/HoodieRecordMerger.java      |  62 +++++-
 .../table/log/block/HoodieAvroDataBlock.java       |   2 +-
 .../common/table/log/block/HoodieDataBlock.java    |  14 +-
 .../read/HoodieBaseFileGroupRecordBuffer.java      |  43 +++-
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  |  20 +-
 .../HoodiePositionBasedFileGroupRecordBuffer.java  |  18 +-
 .../org/apache/hudi/common/fs/TestFSUtils.java     |   3 +
 .../table/read/TestHoodieFileGroupReaderBase.java  |  35 +--
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  19 +-
 .../sql/hudi/TestPartialUpdateForMergeInto.scala   | 225 +++++++++++---------
 20 files changed, 643 insertions(+), 172 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 4b6bdbae46d..a488e7174e5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.merge.SparkRecordMergingUtils;
 
 import org.apache.avro.Schema;
 
@@ -76,6 +77,45 @@ public class HoodieSparkRecordMerger implements 
HoodieRecordMerger {
     }
   }
 
+  @Override
+  public Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
+    ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
+    ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
+
+    if (newer instanceof HoodieSparkRecord) {
+      HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+      if (newSparkRecord.isDeleted()) {
+        // Delete record
+        return Option.empty();
+      }
+    } else {
+      if (newer.getData() == null) {
+        // Delete record
+        return Option.empty();
+      }
+    }
+
+    if (older instanceof HoodieSparkRecord) {
+      HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
+      if (oldSparkRecord.isDeleted()) {
+        // use natural order for delete record
+        return Option.of(Pair.of(newer, newSchema));
+      }
+    } else {
+      if (older.getData() == null) {
+        // use natural order for delete record
+        return Option.of(Pair.of(newer, newSchema));
+      }
+    }
+    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+      return Option.of(SparkRecordMergingUtils.mergePartialRecords(
+          (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, readerSchema, props));
+    } else {
+      return Option.of(SparkRecordMergingUtils.mergePartialRecords(
+          (HoodieSparkRecord) older, oldSchema, (HoodieSparkRecord) newer, 
newSchema, readerSchema, props));
+    }
+  }
+
   @Override
   public HoodieRecordType getRecordType() {
     return HoodieRecordType.SPARK;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 127759c8a22..3d59ad27257 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -18,10 +18,6 @@
 
 package org.apache.hudi.common.model;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.Schema;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.client.model.HoodieInternalRow;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -32,23 +28,30 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
 import org.apache.spark.sql.catalyst.CatalystTypeConverters;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
-import scala.Function1;
 
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
+import scala.Function1;
+
 import static 
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
 import static 
org.apache.spark.sql.HoodieInternalRowUtils.getCachedUnsafeProjection;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
@@ -434,13 +437,17 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     // NOTE: [[HoodieSparkRecord]] is expected to hold either
     //          - Instance of [[UnsafeRow]] or
     //          - Instance of [[HoodieInternalRow]] or
+    //          - Instance of [[GenericInternalRow]]
     //          - Instance of [[ColumnarBatchRow]]
     //
     //       In case provided row is anything but [[UnsafeRow]], it's expected 
that the
     //       corresponding schema has to be provided as well so that it could 
be properly
     //       serialized (in case it would need to be)
     boolean isValid = data == null || data instanceof UnsafeRow
-        || schema != null && (data instanceof HoodieInternalRow || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+        || schema != null && (
+        data instanceof HoodieInternalRow
+            || data instanceof GenericInternalRow
+            || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
 
     ValidationUtils.checkState(isValid);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
index de7810be8ae..c34e2037dac 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 
 public class HoodieSparkFileReaderFactory extends HoodieFileReaderFactory {
 
-  protected HoodieFileReader newParquetFileReader(Configuration conf, Path 
path) {
+  public HoodieFileReader newParquetFileReader(Configuration conf, Path path) {
     conf.setIfUnset(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
     conf.setIfUnset(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
     conf.setIfUnset(SQLConf.CASE_SENSITIVE().key(), 
SQLConf.CASE_SENSITIVE().defaultValueString());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 046fa98fbea..79444d179ce 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -115,7 +115,7 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     });
   }
 
-  private ClosableIterator<InternalRow> getInternalRowIterator(Schema 
readerSchema, Schema requestedSchema) throws IOException {
+  public ClosableIterator<InternalRow> getInternalRowIterator(Schema 
readerSchema, Schema requestedSchema) throws IOException {
     if (requestedSchema == null) {
       requestedSchema = readerSchema;
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
new file mode 100644
index 00000000000..f02ebfc8293
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be used by any Spark {@link HoodieRecordMerger} implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map<Schema, Map<Integer, StructField>> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map<Schema, Map<String, Integer>> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map<Pair<Pair<Schema, Schema>, Schema>,
+      Pair<Map<Integer, StructField>, Pair<StructType, Schema>>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   * <p>
+   * For example, the reader schema is
+   * {[
+   * {"name":"id", "type":"string"},
+   * {"name":"ts", "type":"long"},
+   * {"name":"name", "type":"string"},
+   * {"name":"price", "type":"double"},
+   * {"name":"tags", "type":"string"}
+   * ]}
+   * The older and newer records can be (omitting Hudi meta fields):
+   * <p>
+   * (1) older (complete record update):
+   * id | ts | name  | price | tags
+   * 1 | 10 | apple |  2.3  | fruit
+   * <p>
+   * newer (partial record update):
+   * ts | price
+   * 16 |  2.8
+   * <p>
+   * The merging result is (updated values from newer replaces the ones in the 
older):
+   * <p>
+   * id | ts | name  | price | tags
+   * 1 | 16 | apple |  2.8  | fruit
+   * <p>
+   * (2) older (partial record update):
+   * ts | price
+   * 10 | 2.8
+   * <p>
+   * newer (partial record update):
+   * ts | tag
+   * 16 | fruit,juicy
+   * <p>
+   * The merging result is (two partial updates are merged together, and 
values of overlapped
+   * fields come from the newer):
+   * <p>
+   * ts | price | tags
+   * 16 |  2.8  | fruit,juicy
+   *
+   * @param older        Older {@link HoodieSparkRecord}.
+   * @param oldSchema    Schema of the older record.
+   * @param newer        Newer {@link HoodieSparkRecord}.
+   * @param newSchema    Schema of the newer record.
+   * @param readerSchema Reader schema containing all the fields to read. This 
is used to maintain
+   *                     the ordering of the fields of the merged record.
+   * @param props        Configuration in {@link TypedProperties}.
+   * @return The merged record and schema.
+   */
+  public static Pair<HoodieRecord, Schema> 
mergePartialRecords(HoodieSparkRecord older,
+                                                               Schema 
oldSchema,
+                                                               
HoodieSparkRecord newer,
+                                                               Schema 
newSchema,
+                                                               Schema 
readerSchema,
+                                                               TypedProperties 
props) {
+    // The merged schema contains fields that only appear in either older 
and/or newer record
+    Pair<Map<Integer, StructField>, Pair<StructType, Schema>> mergedSchemaPair 
=
+        getCachedMergedSchema(oldSchema, newSchema, readerSchema);
+    boolean isNewerPartial = isPartial(newSchema, 
mergedSchemaPair.getRight().getRight());
+    if (isNewerPartial) {
+      InternalRow oldRow = older.getData();
+      InternalRow newPartialRow = newer.getData();
+
+      Map<Integer, StructField> mergedIdToFieldMapping = 
mergedSchemaPair.getLeft();
+      Map<String, Integer> newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+      List<Object> values = new ArrayList<>(mergedIdToFieldMapping.size());
+      for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+        StructField structField = mergedIdToFieldMapping.get(fieldId);
+        Integer ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+        if (ordInPartialUpdate != null) {
+          // The field exists in the newer record; picks the value from newer 
record
+          values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+        } else {
+          // The field does not exist in the newer record; picks the value 
from older record
+          values.add(oldRow.get(fieldId, structField.dataType()));
+        }
+      }
+      InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+      HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+          mergedRow, mergedSchemaPair.getRight().getLeft());
+      return Pair.of(mergedSparkRecord, 
mergedSchemaPair.getRight().getRight());
+    } else {
+      return Pair.of(newer, newSchema);
+    }
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public static Map<Integer, StructField> 
getCachedFieldIdToFieldMapping(Schema avroSchema) {
+    return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(avroSchema, schema 
-> {
+      StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+      Map<Integer, StructField> schemaFieldIdMapping = new HashMap<>();
+      int fieldId = 0;
+
+      for (StructField field : structType.fields()) {
+        schemaFieldIdMapping.put(fieldId, field);
+        fieldId++;
+      }
+
+      return schemaFieldIdMapping;
+    });
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field name to ID mapping.
+   */
+  public static Map<String, Integer> getCachedFieldNameToIdMapping(Schema 
avroSchema) {
+    return FIELD_NAME_TO_ID_MAPPING_CACHE.computeIfAbsent(avroSchema, schema 
-> {
+      StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+      Map<String, Integer> schemaFieldIdMapping = new HashMap<>();
+      int fieldId = 0;
+
+      for (StructField field : structType.fields()) {
+        schemaFieldIdMapping.put(field.name(), fieldId);
+        fieldId++;
+      }
+
+      return schemaFieldIdMapping;
+    });
+  }
+
+  /**
+   * Merges the two schemas so the merged schema contains all the fields from 
the two schemas,
+   * with the same ordering of fields based on the provided reader schema.
+   *
+   * @param oldSchema    Old schema.
+   * @param newSchema    New schema.
+   * @param readerSchema Reader schema containing all the fields to read.
+   * @return The ID to {@link StructField} instance mapping of the merged 
schema, and the
+   * {@link StructType} and Avro schema of the merged schema.
+   */
+  public static Pair<Map<Integer, StructField>, Pair<StructType, Schema>> 
getCachedMergedSchema(Schema oldSchema,
+                                                                               
                 Schema newSchema,
+                                                                               
                 Schema readerSchema) {
+    return MERGED_SCHEMA_CACHE.computeIfAbsent(
+        Pair.of(Pair.of(oldSchema, newSchema), readerSchema), schemaPair -> {
+          Schema schema1 = schemaPair.getLeft().getLeft();
+          Schema schema2 = schemaPair.getLeft().getRight();
+          Schema refSchema = schemaPair.getRight();
+          Map<String, Integer> nameToIdMapping1 = 
getCachedFieldNameToIdMapping(schema1);
+          Map<String, Integer> nameToIdMapping2 = 
getCachedFieldNameToIdMapping(schema2);
+          // Mapping of field ID/position to the StructField instance of the 
readerSchema
+          Map<Integer, StructField> refFieldIdToFieldMapping = 
getCachedFieldIdToFieldMapping(refSchema);
+          // This field name set contains all the fields that appear
+          // either in the oldSchema and/or the newSchema
+          Set<String> fieldNameSet = new HashSet<>();
+          fieldNameSet.addAll(nameToIdMapping1.keySet());
+          fieldNameSet.addAll(nameToIdMapping2.keySet());
+          int fieldId = 0;
+          Map<Integer, StructField> mergedMapping = new HashMap<>();
+          List<StructField> mergedFieldList = new ArrayList<>();
+          // Iterates over the fields based on the original ordering of the 
fields of the
+          // readerSchema using the field ID/position from 0
+          for (int i = 0; i < refFieldIdToFieldMapping.size(); i++) {
+            StructField field = refFieldIdToFieldMapping.get(i);
+            if (fieldNameSet.contains(field.name())) {
+              mergedMapping.put(fieldId, field);
+              mergedFieldList.add(field);
+              fieldId++;
+            }
+          }
+          StructType mergedStructType = new 
StructType(mergedFieldList.toArray(new StructField[0]));
+          Schema mergedSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(
+              mergedStructType, readerSchema.getName(), 
readerSchema.getNamespace());
+          return Pair.of(mergedMapping, Pair.of(mergedStructType, 
mergedSchema));
+        });
+  }
+
+  /**
+   * @param schema       Avro schema to check.
+   * @param mergedSchema The merged schema for the merged record.
+   * @return whether the Avro schema is partial compared to the merged schema.
+   */
+  public static boolean isPartial(Schema schema, Schema mergedSchema) {
+    return !schema.equals(mergedSchema);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 221d734dfb7..5360535620f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -94,8 +94,7 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
 
   @Override
   public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow> 
rowOption,
-                                                         Map<String, Object> 
metadataMap,
-                                                         Schema schema) {
+                                                         Map<String, Object> 
metadataMap) {
     if (!rowOption.isPresent()) {
       return new HoodieEmptyRecord<>(
           new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
@@ -103,6 +102,7 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
           HoodieRecord.HoodieRecordType.SPARK);
     }
 
+    Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
     InternalRow row = rowOption.get();
     return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 25ea9d3332b..af3d3fd239c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -20,16 +20,23 @@
 package org.apache.hudi
 
 import org.apache.avro.Schema
+import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.engine.HoodieReaderContext
-import org.apache.hudi.common.util.collection.ClosableIterator
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.collection.{ClosableIterator, 
CloseableMappingIterator}
+import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader}
 import org.apache.hudi.util.CloseableInternalRowIterator
+import org.apache.spark.sql.HoodieInternalRowUtils
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{HoodieInternalRowUtils, SparkSession}
+
+import scala.collection.mutable
 
 /**
  * Implementation of {@link HoodieReaderContext} to read {@link InternalRow}s 
with
@@ -37,12 +44,14 @@ import org.apache.spark.sql.{HoodieInternalRowUtils, 
SparkSession}
  *
  * This uses Spark parquet reader to read parquet data files or parquet log 
blocks.
  *
- * @param baseFileReader    A reader that transforms a {@link PartitionedFile} 
to an iterator of {@link InternalRow}
- * @param partitionValues   The values for a partition in which the file group 
lives.
+ * @param baseFileReader  A reader that transforms a {@link PartitionedFile} 
to an iterator of {@link InternalRow}.
+ * @param partitionValues The values for a partition in which the file group 
lives.
  */
 class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile 
=> Iterator[InternalRow],
                                               partitionValues: InternalRow) 
extends BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
+  lazy val sparkFileReaderFactory = new HoodieSparkFileReaderFactory
+  val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
 
   override def getFileRecordIterator(filePath: Path,
                                      start: Long,
@@ -50,7 +59,38 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
                                      dataSchema: Schema,
                                      requiredSchema: Schema,
                                      conf: Configuration): 
ClosableIterator[InternalRow] = {
-    val fileInfo = 
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
 filePath, start, length)
-    new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+    val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
+      .createPartitionedFile(partitionValues, filePath, start, length)
+    if (FSUtils.isLogFile(filePath)) {
+      val structType: StructType = 
HoodieInternalRowUtils.getCachedSchema(dataSchema)
+      val projection: UnsafeProjection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
+      new CloseableMappingIterator[InternalRow, UnsafeRow](
+        sparkFileReaderFactory.newParquetFileReader(conf, 
filePath).asInstanceOf[HoodieSparkParquetReader]
+          .getInternalRowIterator(dataSchema, dataSchema),
+        new java.util.function.Function[InternalRow, UnsafeRow] {
+          override def apply(data: InternalRow): UnsafeRow = {
+            // NOTE: We have to do [[UnsafeProjection]] of incoming 
[[InternalRow]] to convert
+            //       it to [[UnsafeRow]] holding just raw bytes
+            projection.apply(data)
+          }
+        }).asInstanceOf[ClosableIterator[InternalRow]]
+    } else {
+      new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+    }
+  }
+
+  /**
+   * Converts an Avro record, e.g., serialized in the log files, to an 
[[InternalRow]].
+   *
+   * @param avroRecord The Avro record.
+   * @return An [[InternalRow]].
+   */
+  override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = {
+    val schema = avroRecord.getSchema
+    val structType = HoodieInternalRowUtils.getCachedSchema(schema)
+    val deserializer = deserializerMap.getOrElseUpdate(schema, {
+      sparkAdapter.createAvroDeserializer(schema, structType)
+    })
+    deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
index 2b01b270a7c..30a5e93fb63 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
@@ -46,6 +46,7 @@ class CloseableInternalRowIterator(iterator: Iterator[_]) 
extends ClosableIterat
 
   override def next: InternalRow = {
     if (!entryTypeKnown) {
+      entryTypeKnown = true
       // First entry
       val nextVal = iterator.next
       seqInBatch = 0
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 9ccdfd863c2..d561b059183 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,6 +53,7 @@ public abstract class HoodieReaderContext<T> {
   public static final String INTERNAL_META_ORDERING_FIELD = "_2";
   public static final String INTERNAL_META_OPERATION = "_3";
   public static final String INTERNAL_META_INSTANT_TIME = "_4";
+  public static final String INTERNAL_META_SCHEMA = "_5";
 
   /**
    * Gets the file system based on the file path and configuration.
@@ -77,6 +79,14 @@ public abstract class HoodieReaderContext<T> {
   public abstract ClosableIterator<T> getFileRecordIterator(
       Path filePath, long start, long length, Schema dataSchema, Schema 
requiredSchema, Configuration conf);
 
+  /**
+   * Converts an Avro record, e.g., serialized in the log files, to an 
engine-specific record.
+   *
+   * @param avroRecord The Avro record.
+   * @return An engine-specific record in Type {@link T}.
+   */
+  public abstract T convertAvroRecord(IndexedRecord avroRecord);
+
   /**
    * @param mergerStrategy Merger strategy UUID.
    * @return {@link HoodieRecordMerger} to use.
@@ -121,12 +131,10 @@ public abstract class HoodieReaderContext<T> {
    *
    * @param recordOption An option of the record in engine-specific type if 
exists.
    * @param metadataMap  The record metadata.
-   * @param schema       The Avro schema of the record.
    * @return A new instance of {@link HoodieRecord}.
    */
   public abstract HoodieRecord<T> constructHoodieRecord(Option<T> recordOption,
-                                                        Map<String, Object> 
metadataMap,
-                                                        Schema schema);
+                                                        Map<String, Object> 
metadataMap);
 
   /**
    * Seals the engine-specific record to make sure the data referenced in 
memory do not change.
@@ -163,6 +171,7 @@ public abstract class HoodieReaderContext<T> {
   public Map<String, Object> generateMetadataForRecord(T record, Schema 
schema) {
     Map<String, Object> meta = new HashMap<>();
     meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
+    meta.put(INTERNAL_META_SCHEMA, schema);
     return meta;
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 60d8d09f3d5..5d9896f8f2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.fs;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
 import org.apache.hudi.common.fs.inline.InLineFileSystem;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -467,7 +468,9 @@ public class FSUtils {
   }
 
   public static boolean isLogFile(Path logPath) {
-    return isLogFile(logPath.getName());
+    String scheme = logPath.toUri().getScheme();
+    return isLogFile(InLineFileSystem.SCHEME.equals(scheme)
+        ? InLineFSUtils.getOuterFilePathFromInlinePath(logPath).getName() : 
logPath.getName());
   }
 
   public static boolean isLogFile(String fileName) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 66a2ad77f2e..7a259d0bd9a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.avro.Schema;
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.common.config.TypedProperties;
@@ -26,6 +25,8 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
+import org.apache.avro.Schema;
+
 import java.io.IOException;
 import java.io.Serializable;
 
@@ -43,9 +44,68 @@ public interface HoodieRecordMerger extends Serializable {
    * This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload.
    * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
    * of the single record, both orders of operations applications have to 
yield the same result)
+   * This method takes only full records for merging.
    */
   Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException;
 
+  /**
+   * Merges records which can contain partial updates, i.e., only subset of 
fields and values are
+   * present in the record representing the updates, and absent fields are not 
updated. The fields
+   * exist in older and newer records indicate the fields with changed values. 
When merging, only
+   * the changed fields should be included in the merging results.
+   * <p>
+   * For example, the reader schema is
+   * {[
+   * {"name":"id", "type":"string"},
+   * {"name":"ts", "type":"long"},
+   * {"name":"name", "type":"string"},
+   * {"name":"price", "type":"double"},
+   * {"name":"tags", "type":"string"}
+   * ]}
+   * The older and newer records can be (omitting Hudi meta fields):
+   * <p>
+   * (1) older (complete record update):
+   * id | ts | name  | price | tags
+   *  1 | 10 | apple |  2.3  | fruit
+   * <p>
+   * newer (partial record update):
+   * ts | price
+   * 16 |  2.8
+   * <p>
+   * In this case, in the newer record, only "ts" and "price" fields are 
updated. With the default
+   * merging strategy, the newer record updates the older record and the 
merging result is
+   * <p>
+   * id | ts | name  | price | tags
+   *  1 | 16 | apple |  2.8  | fruit
+   * <p>
+   * (2) older (partial record update):
+   * ts | price
+   * 10 | 2.8
+   * <p>
+   * newer (partial record update):
+   * ts | tag
+   * 16 | fruit,juicy
+   * <p>
+   * In this case, in the older record, only "ts" and "price" fields are 
updated. In the newer
+   * record, only "ts" and "tag" fields are updated. With the default merging 
strategy, all the
+   * changed fields should be included in the merging results.
+   * <p>
+   * ts | price | tags
+   * 16 |  2.8  | fruit,juicy
+   *
+   * @param older        Older record.
+   * @param oldSchema    Schema of the older record.
+   * @param newer        Newer record.
+   * @param newSchema    Schema of the newer record.
+   * @param readerSchema Reader schema containing all the fields to read. This 
is used to maintain
+   *                     the ordering of the fields of the merged record.
+   * @param props        Configuration in {@link TypedProperties}.
+   * @return The merged record and schema.
+   * @throws IOException upon merging error.
+   */
+  default Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
+    throw new UnsupportedOperationException("Partial merging logic is not 
implemented.");
+  }
 
   /**
    * In some cases a business logic does some checks before flushing a merged 
record to the disk.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index ca568c0f38c..156bebb4495 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -150,7 +150,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
   protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T> 
readerContext, byte[] content) throws IOException {
     checkState(this.readerSchema != null, "Reader's schema has to be 
non-null");
     RecordIterator iterator = RecordIterator.getInstance(this, content);
-    return new CloseableMappingIterator<>(iterator, data -> (T) data);
+    return new CloseableMappingIterator<>(iterator, data -> 
readerContext.convertAvroRecord(data));
   }
 
   private static class RecordIterator implements 
ClosableIterator<IndexedRecord> {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index a0f9016f409..3e4025aa307 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -122,8 +122,13 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     this.shouldWriteRecordPositions = false;
     this.records = Option.empty();
     this.keyFieldName = keyFieldName;
-    // If no reader-schema has been provided assume writer-schema as one
-    this.readerSchema = readerSchema.orElseGet(() -> 
getWriterSchema(super.getLogBlockHeader()));
+    this.readerSchema = containsPartialUpdates()
+        // When the data block contains partial updates, we need to strictly 
use the writer schema
+        // from the log block header, as we need to use the partial schema to 
indicate which
+        // fields are updated during merging.
+        ? getWriterSchema(super.getLogBlockHeader())
+        // If no reader-schema has been provided assume writer-schema as one
+        : readerSchema.orElseGet(() -> 
getWriterSchema(super.getLogBlockHeader()));
     this.enablePointLookups = enablePointLookups;
   }
 
@@ -145,6 +150,11 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     return keyFieldName;
   }
 
+  public boolean containsPartialUpdates() {
+    return getLogBlockHeader().containsKey(HeaderMetadataType.IS_PARTIAL)
+        && 
Boolean.parseBoolean(getLogBlockHeader().get(HeaderMetadataType.IS_PARTIAL));
+  }
+
   protected static Schema getWriterSchema(Map<HeaderMetadataType, String> 
logBlockHeader) {
     return new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 991428ae87c..4a1bd08e4ef 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -46,6 +46,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
+
 public abstract class HoodieBaseFileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T> {
   protected final HoodieReaderContext<T> readerContext;
   protected final Schema readerSchema;
@@ -58,6 +60,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
   protected T nextRecord;
+  protected boolean enablePartialMerging = false;
 
   public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                          Schema readerSchema,
@@ -122,13 +125,25 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       // Merge and store the combined record
       // Note that the incoming `record` is from an older commit, so it should 
be put as
       // the `older` in the merge API
-      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
-          readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
-          readerSchema,
-          readerContext.constructHoodieRecord(
-              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-          readerSchema,
-          payloadProps).get().getLeft();
+      HoodieRecord<T> combinedRecord;
+      if (enablePartialMerging) {
+        combinedRecord = (HoodieRecord<T>) recordMerger.partialMerge(
+            readerContext.constructHoodieRecord(Option.of(record), metadata),
+            (Schema) metadata.get(INTERNAL_META_SCHEMA),
+            readerContext.constructHoodieRecord(
+                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+            (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+            readerSchema,
+            payloadProps).get().getLeft();
+      } else {
+        combinedRecord = (HoodieRecord<T>) recordMerger.merge(
+            readerContext.constructHoodieRecord(Option.of(record), metadata),
+            (Schema) metadata.get(INTERNAL_META_SCHEMA),
+            readerContext.constructHoodieRecord(
+                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+            (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+            payloadProps).get().getLeft();
+      }
       // If pre-combine returns existing record, no need to update it
       if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
         return Option.of(combinedRecord.getData());
@@ -210,9 +225,17 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       return newer;
     }
 
-    Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
-        readerContext.constructHoodieRecord(older, olderInfoMap, 
baseFileSchema), baseFileSchema,
-        readerContext.constructHoodieRecord(newer, newerInfoMap, 
readerSchema), readerSchema, payloadProps);
+    Option<Pair<HoodieRecord, Schema>> mergedRecord;
+    if (enablePartialMerging) {
+      mergedRecord = recordMerger.partialMerge(
+          readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA),
+          readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA),
+          readerSchema, payloadProps);
+    } else {
+      mergedRecord = recordMerger.merge(
+          readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA),
+          readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
+    }
     if (mergedRecord.isPresent()) {
       return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 479a0c1b339..6e5679adfbf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -39,8 +39,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
 /**
  * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
  * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} 
methods into a record key based map.
@@ -66,17 +64,19 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
 
   @Override
   public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws IOException {
-    checkState(partitionNameOverrideOpt.isPresent() || 
partitionPathFieldOpt.isPresent(),
-        "Either partition-name override or partition-path field had to be 
present");
-
-
     Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
         getRecordsIterator(dataBlock, keySpecOpt);
+    if (dataBlock.containsPartialUpdates()) {
+      // When a data block contains partial updates, subsequent record merging 
must always use
+      // partial merging.
+      enablePartialMerging = true;
+    }
 
     try (ClosableIterator<T> recordIterator = 
recordsIteratorSchemaPair.getLeft()) {
       while (recordIterator.hasNext()) {
         T nextRecord = recordIterator.next();
-        Map<String, Object> metadata = 
readerContext.generateMetadataForRecord(nextRecord, readerSchema);
+        Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+            nextRecord, recordsIteratorSchemaPair.getRight());
         String recordKey = (String) 
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
         processNextDataRecord(nextRecord, metadata, recordKey);
       }
@@ -127,10 +127,12 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileGroupR
 
       String recordKey = readerContext.getRecordKey(baseRecord, 
baseFileSchema);
       Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(recordKey);
+      Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+          baseRecord, baseFileSchema);
 
       Option<T> resultRecord = logRecordInfo != null
-          ? merge(Option.of(baseRecord), Collections.emptyMap(), 
logRecordInfo.getLeft(), logRecordInfo.getRight())
-          : merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), Collections.emptyMap());
+          ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight())
+          : merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), metadata);
       if (resultRecord.isPresent()) {
         nextRecord = readerContext.seal(resultRecord.get());
         return true;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 6c80af8b56c..352e7c726d7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -42,8 +42,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
 /**
  * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
  * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} 
methods into record position based map.
@@ -71,9 +69,6 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
 
   @Override
   public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws IOException {
-    checkState(partitionNameOverrideOpt.isPresent() || 
partitionPathFieldOpt.isPresent(),
-        "Either partition-name override or partition-path field had to be 
present");
-
     // Prepare key filters.
     Set<String> keys = new HashSet<>();
     boolean isFullKey = true;
@@ -84,6 +79,12 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
       isFullKey = keySpecOpt.get().isFullKey();
     }
 
+    if (dataBlock.containsPartialUpdates()) {
+      // When a data block contains partial updates, subsequent record merging 
must always use
+      // partial merging.
+      enablePartialMerging = true;
+    }
+    
     // Extract positions from data block.
     List<Long> recordPositions = extractRecordPositions(dataBlock);
 
@@ -159,9 +160,12 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
       T baseRecord = baseFileIterator.next();
       Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
 
+      Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+          baseRecord, baseFileSchema);
+
       Option<T> resultRecord = logRecordInfo != null
-          ? merge(Option.of(baseRecord), Collections.emptyMap(), 
logRecordInfo.getLeft(), logRecordInfo.getRight())
-          : merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), Collections.emptyMap());
+          ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight())
+          : merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), metadata);
       if (resultRecord.isPresent()) {
         nextRecord = readerContext.seal(resultRecord.get());
         return true;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 612929bc8a6..350a14d591a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.fs;
 
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -244,7 +245,9 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, 
"1-0-1");
     System.out.println("Log File =" + logFile);
     Path rlPath = new Path(new Path(partitionPath), logFile);
+    Path inlineFsPath = InLineFSUtils.getInlineFilePath(rlPath, "file", 0, 
100);
     assertTrue(FSUtils.isLogFile(rlPath));
+    assertTrue(FSUtils.isLogFile(inlineFsPath));
     assertEquals(fileName, FSUtils.getFileIdFromLogPath(rlPath));
     assertEquals("100", FSUtils.getDeltaCommitTimeFromLogPath(rlPath));
     assertEquals(2, FSUtils.getFileVersionFromLog(rlPath));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index af1fc120c5b..febc0d32466 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.FileSlice;
@@ -35,6 +36,7 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -66,12 +68,13 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
 
   public abstract String getBasePath();
 
-  public abstract HoodieReaderContext<T> getHoodieReaderContext(String[] 
partitionPath);
+  public abstract HoodieReaderContext<T> getHoodieReaderContext(String 
tablePath, String[] partitionPath);
 
   public abstract void commitToTable(List<String> recordList, String operation,
                                      Map<String, String> writeConfigs);
 
-  public abstract void validateRecordsInFileGroup(List<T> actualRecordList,
+  public abstract void validateRecordsInFileGroup(String tablePath,
+                                                  List<T> actualRecordList,
                                                   Schema schema,
                                                   String fileGroupId);
 
@@ -90,7 +93,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
     writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
     writeConfigs.put("hoodie.compact.inline", "false");
-    writeConfigs.put(HoodieMetadataConfig.ENABLE.key(), "false");
 
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a base file only
@@ -108,16 +110,19 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   }
 
   private void validateOutputFromFileGroupReader(Configuration hadoopConf,
-                                                 String basePath,
+                                                 String tablePath,
                                                  String[] partitionPaths,
                                                  int expectedLogFileNum) 
throws Exception {
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf(hadoopConf).setBasePath(basePath).build();
+        .setConf(hadoopConf).setBasePath(tablePath).build();
     Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(hadoopConf);
+    HoodieMetadataConfig metadataConfig = 
HoodieMetadataConfig.newBuilder().build();
     FileSystemViewManager viewManager = 
FileSystemViewManager.createViewManager(
-        new HoodieLocalEngineContext(hadoopConf), 
HoodieMetadataConfig.newBuilder().build(),
-        FileSystemViewStorageConfig.newBuilder().build(),
-        HoodieCommonConfig.newBuilder().build());
+        engineContext, metadataConfig, 
FileSystemViewStorageConfig.newBuilder().build(),
+        HoodieCommonConfig.newBuilder().build(),
+        mc -> HoodieTableMetadata.create(
+            engineContext, metadataConfig, mc.getBasePathV2().toString()));
     SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
     FileSlice fileSlice = 
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
     List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
@@ -129,12 +134,14 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
     props.setProperty("hoodie.payload.ordering.field", "timestamp");
     props.setProperty(RECORD_MERGER_STRATEGY.key(), 
RECORD_MERGER_STRATEGY.defaultValue());
-    props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS.key()));
-    String[] partitionValues = {partitionPaths[0]};
-    HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<T>(
-        getHoodieReaderContext(partitionValues),
+    if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
+      props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
+    }
+    String[] partitionValues = partitionPaths[0].isEmpty() ? new String[] {} : 
new String[] {partitionPaths[0]};
+    HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
+        getHoodieReaderContext(tablePath, partitionValues),
         hadoopConf,
-        basePath,
+        tablePath,
         metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
         fileSlice.getBaseFile(),
         logFilePathList.isEmpty() ? Option.empty() : 
Option.of(logFilePathList),
@@ -149,6 +156,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     }
     fileGroupReader.close();
 
-    validateRecordsInFileGroup(actualRecordList, avroSchema, 
fileSlice.getFileId());
+    validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema, 
fileSlice.getFileId());
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 50056954a12..a1441be5ddc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -74,16 +74,21 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     tempDir.toAbsolutePath.toUri.toString
   }
 
-  override def getHoodieReaderContext(partitionValues: Array[String]): 
HoodieReaderContext[InternalRow] = {
+  override def getHoodieReaderContext(tablePath: String,
+                                      partitionValues: Array[String]): 
HoodieReaderContext[InternalRow] = {
     val parquetFileFormat = new ParquetFileFormat
-    val metaClient = 
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(getBasePath).build
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(tablePath).build
     val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
     val structTypeSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
     val partitionFields = metaClient.getTableConfig.getPartitionFields
-    val partitionSchema = new StructType(structTypeSchema.fields.filter(f => 
partitionFields.get().contains(f.name)))
+    val partitionSchema = if (partitionFields.isPresent) {
+      new StructType(structTypeSchema.fields.filter(f => 
partitionFields.get().contains(f.name)))
+    } else {
+      new StructType()
+    }
 
     val recordReaderIterator = 
parquetFileFormat.buildReaderWithPartitionValues(
-    spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty, 
Map.empty, getHadoopConf)
+      spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty, 
Map.empty, getHadoopConf)
     val numPartitionFields = if (partitionFields.isPresent) 
partitionFields.get().length else 0
     assertEquals(numPartitionFields, partitionValues.length)
 
@@ -98,6 +103,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
 
   override def commitToTable(recordList: util.List[String], operation: String, 
options: util.Map[String, String]): Unit = {
     val inputDF: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(recordList.toList, 2))
+
     inputDF.write.format("hudi")
       .options(options)
       .option("hoodie.compact.inline", "false") // else fails due to 
compaction & deltacommit instant times being same
@@ -108,11 +114,12 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
       .save(getBasePath)
   }
 
-  override def validateRecordsInFileGroup(actualRecordList: 
util.List[InternalRow],
+  override def validateRecordsInFileGroup(basePath: String,
+                                          actualRecordList: 
util.List[InternalRow],
                                           schema: Schema,
                                           fileGroupId: String): Unit = {
     val expectedDf = spark.read.format("hudi")
-      .load(getBasePath)
+      .load(basePath)
       .where(col(HoodieRecord.FILENAME_METADATA_FIELD).contains(fileGroupId))
     assertEquals(expectedDf.count, actualRecordList.size)
     val actualDf = HoodieUnsafeUtils.createDataFrameFromInternalRows(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 4e6232fe3fe..f1375f0749d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -18,7 +18,12 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.avro.Schema
+import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
+import org.apache.hudi.DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.avro.HoodieAvroUtils
+import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
+import 
org.apache.hudi.common.config.HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
 import org.apache.hudi.common.function.SerializableFunctionUnchecked
@@ -30,9 +35,9 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf, 
getLogFileListFromFileSlice}
 import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.hudi.metadata.HoodieTableMetadata
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 
+import java.io.File
 import java.util.{Collections, List}
 import scala.collection.JavaConverters._
 
@@ -40,108 +45,84 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
 
   test("Test Partial Update") {
     withTempDir { tmp =>
-      Seq("cow", "mor").foreach { tableType =>
-        val tableName = generateTableName
-        val basePath = tmp.getCanonicalPath + "/" + tableName
-        spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
-        spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
-        spark.sql(
-          s"""
-             |create table $tableName (
-             | id int,
-             | name string,
-             | price double,
-             | _ts long
-             |) using hudi
-             |tblproperties(
-             | type ='$tableType',
-             | primaryKey = 'id',
-             | preCombineField = '_ts'
-             |)
-             |location '$basePath'
-          """.stripMargin)
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-
-        spark.sql(
-          s"""
-             |merge into $tableName t0
-             |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts ) 
s0
-             |on t0.id = s0.id
-             |when matched then update set price = s0.price, _ts = s0.ts
-             |""".stripMargin)
-        if (tableType.equals("cow")) {
-          checkAnswer(s"select id, name, price, _ts from $tableName")(
-            Seq(1, "a1", 12.0, 1001)
-          )
-        } else {
-          // TODO(HUDI-6801): validate data once merging of partial updates is 
implemented
-          // Validate the log file
-          val hadoopConf = getDefaultHadoopConf
-          val metaClient: HoodieTableMetaClient =
-            
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
-          val metadataConfig = HoodieMetadataConfig.newBuilder.build
-          val engineContext = new HoodieLocalEngineContext(hadoopConf)
-          val viewManager: FileSystemViewManager = 
FileSystemViewManager.createViewManager(
-            engineContext, metadataConfig, 
FileSystemViewStorageConfig.newBuilder.build,
-            HoodieCommonConfig.newBuilder.build,
-            new SerializableFunctionUnchecked[HoodieTableMetaClient, 
HoodieTableMetadata] {
-              override def apply(v1: HoodieTableMetaClient): 
HoodieTableMetadata = {
-                HoodieTableMetadata.create(
-                  engineContext, metadataConfig, 
metaClient.getBasePathV2.toString)
-              }
-            }
-          )
-          val fsView: SyncableFileSystemView = 
viewManager.getFileSystemView(metaClient)
-          val fileSlice: FileSlice = fsView.getAllFileSlices("").findFirst.get
-          val logFilePathList: List[String] = 
getLogFileListFromFileSlice(fileSlice)
-          Collections.sort(logFilePathList)
-          assertEquals(1, logFilePathList.size)
-
-          val avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema
-          val logReader = new HoodieLogFileReader(
-            metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
-            avroSchema, 1024 * 1024, true, false, false,
-            "id", null)
-          assertTrue(logReader.hasNext)
-          val logBlockHeader = logReader.next().getLogBlockHeader
-          assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
-          assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
-          val partialSchema = new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
-          val expectedPartialSchema = 
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
-            avroSchema, Seq("price", "_ts").asJava), false)
-          assertEquals(expectedPartialSchema, partialSchema)
-          
assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
-        }
+      testPartialUpdate(tmp, "cow", "avro")
+      testPartialUpdate(tmp, "mor", "avro")
+      testPartialUpdate(tmp, "mor", "parquet")
+    }
+  }
 
-        val tableName2 = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName2 (
-             | id int,
-             | name string,
-             | price double
-             |) using hudi
-             |tblproperties(
-             | type ='$tableType',
-             | primaryKey = 'id'
-             |)
-             |location '${tmp.getCanonicalPath}/$tableName2'
-          """.stripMargin)
-        spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
-
-        spark.sql(
-          s"""
-             |merge into $tableName2 t0
-             |using ( select 1 as id, 'a1' as name, 12 as price) s0
-             |on t0.id = s0.id
-             |when matched then update set price = s0.price
-             |""".stripMargin)
-        if (tableType.equals("cow")) {
-          checkAnswer(s"select id, name, price from $tableName2")(
-            Seq(1, "a1", 12.0)
-          )
-        }
-      }
+  def testPartialUpdate(tmp: File,
+                        tableType: String,
+                        logDataBlockFormat: String): Unit = {
+    val tableName = generateTableName
+    val basePath = tmp.getCanonicalPath + "/" + tableName
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+    spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+    spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
+    spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+    spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
+    spark.sql(
+      s"""
+         |create table $tableName (
+         | id int,
+         | name string,
+         | price double,
+         | _ts long
+         |) using hudi
+         |tblproperties(
+         | type ='$tableType',
+         | primaryKey = 'id',
+         | preCombineField = '_ts'
+         |)
+         |location '$basePath'
+        """.stripMargin)
+    spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+
+    spark.sql(
+      s"""
+         |merge into $tableName t0
+         |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts ) s0
+         |on t0.id = s0.id
+         |when matched then update set price = s0.price, _ts = s0.ts
+         |""".stripMargin)
+
+    checkAnswer(s"select id, name, price, _ts from $tableName")(
+      Seq(1, "a1", 12.0, 1001)
+    )
+
+    if (tableType.equals("mor")) {
+      validateLogBlock(basePath, 1, Seq("price", "_ts"))
+    }
+
+    if (tableType.equals("cow")) {
+      // No preCombine field
+      val tableName2 = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName2 (
+           | id int,
+           | name string,
+           | price double
+           |) using hudi
+           |tblproperties(
+           | type ='$tableType',
+           | primaryKey = 'id'
+           |)
+           |location '${tmp.getCanonicalPath}/$tableName2'
+        """.stripMargin)
+      spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
+
+      spark.sql(
+        s"""
+           |merge into $tableName2 t0
+           |using ( select 1 as id, 'a1' as name, 12 as price) s0
+           |on t0.id = s0.id
+           |when matched then update set price = s0.price
+           |""".stripMargin)
+
+      checkAnswer(s"select id, name, price from $tableName2")(
+        Seq(1, "a1", 12.0)
+      )
     }
   }
 
@@ -190,4 +171,44 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
          | preCombineField = '_ts'
          |)""".stripMargin)
   }
+
+  def validateLogBlock(basePath: String,
+                       expectedNumLogFile: Int,
+                       changedFields: Seq[String]): Unit = {
+    val hadoopConf = getDefaultHadoopConf
+    val metaClient: HoodieTableMetaClient =
+      
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
+    val metadataConfig = HoodieMetadataConfig.newBuilder.build
+    val engineContext = new HoodieLocalEngineContext(hadoopConf)
+    val viewManager: FileSystemViewManager = 
FileSystemViewManager.createViewManager(
+      engineContext, metadataConfig, 
FileSystemViewStorageConfig.newBuilder.build,
+      HoodieCommonConfig.newBuilder.build,
+      new SerializableFunctionUnchecked[HoodieTableMetaClient, 
HoodieTableMetadata] {
+        override def apply(v1: HoodieTableMetaClient): HoodieTableMetadata = {
+          HoodieTableMetadata.create(
+            engineContext, metadataConfig, metaClient.getBasePathV2.toString)
+        }
+      }
+    )
+    val fsView: SyncableFileSystemView = 
viewManager.getFileSystemView(metaClient)
+    val fileSlice: FileSlice = fsView.getAllFileSlices("").findFirst.get
+    val logFilePathList: List[String] = getLogFileListFromFileSlice(fileSlice)
+    Collections.sort(logFilePathList)
+    assertEquals(expectedNumLogFile, logFilePathList.size)
+
+    val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+    val logReader = new HoodieLogFileReader(
+      metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
+      avroSchema, 1024 * 1024, true, false, false,
+      "id", null)
+    assertTrue(logReader.hasNext)
+    val logBlockHeader = logReader.next().getLogBlockHeader
+    assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
+    assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+    val partialSchema = new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+    val expectedPartialSchema = 
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
+      avroSchema, changedFields.asJava), false)
+    assertEquals(expectedPartialSchema, partialSchema)
+    assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+  }
 }

Reply via email to