This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ebc2545245e  [HUDI-9602] Use BufferedRecordMerger for dedup and 
global index path (#13600)
4ebc2545245e is described below

commit 4ebc2545245e4a82605215229d5e5764f2660b30
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Aug 5 19:53:05 2025 +0530

     [HUDI-9602] Use BufferedRecordMerger for dedup and global index path 
(#13600)
    
    * use the BufferedRecordMerger to deduplicate inputs for COW and index 
write path;
    * Add a new sub-merger for MIT expression payload.
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: Timothy Brown <[email protected]>
    Co-authored-by: danny0405 <[email protected]>
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    | 180 ++++++++++++++-------
 .../hudi/table/action/commit/BaseWriteHelper.java  |  83 +++++++++-
 .../table/action/commit/HoodieWriteHelper.java     |  34 ++--
 .../hudi/utils/HoodieWriterClientTestHarness.java  |  55 +++++--
 .../hudi/table/action/commit/FlinkWriteHelper.java |  37 ++---
 .../hudi/table/action/commit/JavaWriteHelper.java  |  30 ++--
 .../commit/BaseSparkCommitActionExecutor.java      |   6 +-
 .../hudi/BaseSparkInternalRecordContext.java       |   2 +-
 .../org/apache/hudi/avro/AvroRecordContext.java    |  53 ++++--
 .../apache/hudi/avro/HoodieAvroReaderContext.java  |  29 +++-
 .../common/engine/AvroReaderContextFactory.java    |  11 +-
 .../hudi/common/engine/HoodieEngineContext.java    |  17 +-
 .../hudi/common/engine/HoodieReaderContext.java    |  26 ++-
 .../apache/hudi/common/engine/RecordContext.java   |  58 +++++--
 .../apache/hudi/common/model/BaseAvroPayload.java  |  10 ++
 .../apache/hudi/common/model/HoodieAvroRecord.java |   2 +-
 .../hudi/common/model/HoodieAvroRecordMerger.java  |   8 +-
 .../hudi/common/model/HoodieRecordPayload.java     |  23 ++-
 .../hudi/common/table/read/BufferedRecord.java     |  13 +-
 .../table/read/BufferedRecordMergerFactory.java    | 127 +++++++++++----
 .../common/table/read/HoodieFileGroupReader.java   |   7 +-
 .../apache/hudi/common/util/HoodieRecordUtils.java |  13 ++
 .../common/table/read/SchemaHandlerTestBase.java   |   2 +-
 .../read/buffer/BaseTestFileGroupRecordBuffer.java |   6 +
 .../read/buffer/TestFileGroupRecordBuffer.java     |   3 +-
 .../buffer/TestReusableKeyBasedRecordBuffer.java   |   3 +-
 .../TestSortedKeyBasedFileGroupRecordBuffer.java   |   3 +-
 .../common/testutils/HoodieTestDataGenerator.java  |   6 +-
 .../hudi/common/testutils/RawTripTestPayload.java  |   4 +-
 .../hudi/common/util/TestHoodieRecordUtils.java    |  27 ++++
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  33 +++-
 .../hudi/table/format/FlinkRecordContext.java      |   2 +-
 .../org/apache/hudi/hadoop/HiveRecordContext.java  |   2 +-
 .../org/apache/hudi/HoodieMergeOnReadRDDV2.scala   |   3 +-
 .../hudi/command/payload/ExpressionPayload.scala   |  11 ++
 .../hudi/functional/TestBufferedRecordMerger.java  |   2 +-
 .../apache/hudi/functional/TestHoodieIndex.java    |   4 +
 .../dml/others/TestPartialUpdateForMergeInto.scala |   5 +-
 38 files changed, 704 insertions(+), 236 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 5f6d6e520861..658eb1c436d6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -20,14 +20,17 @@ package org.apache.hudi.index;
 
 import org.apache.hudi.avro.AvroSchemaCache;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
@@ -43,7 +46,11 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.read.MergeResult;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.HoodieTimer;
@@ -69,7 +76,6 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +93,7 @@ import static java.util.stream.Collectors.toList;
 import static 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
 import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static 
org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION;
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.IDENTITY_TRANSFORM;
@@ -331,7 +338,7 @@ public class HoodieIndexUtils {
    * @return {@link HoodieRecord}s that have the current location being set.
    */
   private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
-      HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig 
config, HoodieTable hoodieTable) {
+      HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig 
config, HoodieTable hoodieTable, ReaderContextFactory<R> readerContextFactory, 
Schema dataSchema) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     final Option<String> instantTime = metaClient
         .getActiveTimeline() // we need to include all actions and completed
@@ -341,7 +348,6 @@ public class HoodieIndexUtils {
     if (instantTime.isEmpty()) {
       return hoodieTable.getContext().emptyHoodieData();
     }
-    ReaderContextFactory<R> readerContextFactory = 
hoodieTable.getContext().getReaderContextFactory(metaClient);
     return partitionLocations.flatMap(p -> {
       Option<FileSlice> fileSliceOption = Option.fromJavaOptional(hoodieTable
           .getHoodieView()
@@ -351,7 +357,6 @@ public class HoodieIndexUtils {
       if (fileSliceOption.isEmpty()) {
         return Collections.emptyIterator();
       }
-      Schema dataSchema = 
AvroSchemaCache.intern(HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField()));
       Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema());
       FileSlice fileSlice = fileSliceOption.get();
       HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
@@ -386,20 +391,20 @@ public class HoodieIndexUtils {
    * We also need the keygenerator so we can figure out the partition path 
after expression payload
    * evaluates the merge.
    */
-  private static Pair<HoodieWriteConfig, Option<BaseKeyGenerator>> 
getKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig 
tableConfig) {
-    if 
(config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
 {
+  private static Pair<HoodieWriteConfig, BaseKeyGenerator> 
getKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig 
tableConfig, boolean isExpressionPayload) {
+    HoodieWriteConfig writeConfig = config;
+    if (isExpressionPayload) {
       TypedProperties typedProperties = 
TypedProperties.copy(config.getProps());
       // set the payload class to table's payload class and not expresison 
payload. this will be used to read the existing records
       
typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), 
tableConfig.getPayloadClass());
       typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
tableConfig.getPayloadClass());
-      HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
-      try {
-        return Pair.of(writeConfig, Option.of((BaseKeyGenerator) 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())));
-      } catch (IOException e) {
-        throw new RuntimeException("KeyGenerator must inherit from 
BaseKeyGenerator to update a records partition path using spark sql merge 
into", e);
-      }
+      writeConfig = 
HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
+    }
+    try {
+      return Pair.of(writeConfig, (BaseKeyGenerator) 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
+    } catch (IOException e) {
+      throw new RuntimeException("KeyGenerator must inherit from 
BaseKeyGenerator to update a records partition path using spark sql merge 
into", e);
     }
-    return Pair.of(config, Option.empty());
   }
 
   /**
@@ -411,30 +416,53 @@ public class HoodieIndexUtils {
       HoodieRecord<R> incoming,
       HoodieRecord<R> existing,
       Schema writeSchema,
-      Schema existingSchema,
       Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      BaseKeyGenerator keyGenerator) throws IOException {
-    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
-        incoming, writeSchemaWithMetaFields, config.getProps());
-    if (!mergeResult.isPresent()) {
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
+    BufferedRecord<R> incomingBufferedRecord = 
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
+    BufferedRecord<R> existingBufferedRecord = 
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+    MergeResult<R> mergeResult = 
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+    if (mergeResult.isDelete()) {
       //the record was deleted
       return Option.empty();
     }
-    HoodieRecord<R> result = mergeResult.get().getLeft();
-    if (result.getData().equals(HoodieRecord.SENTINEL)) {
+    if (mergeResult.getMergedRecord() == null) {
+      // SENTINEL case: the record did not match and merge case and should not 
be modified
+      return Option.of((HoodieRecord<R>) new 
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+    }
+
+    BufferedRecord<R> resultingBufferedRecord = 
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(), 
writeSchemaWithMetaFields,
+        existingRecordContext, orderingFieldNames, existing.getRecordKey(), 
false);
+
+    if (resultingBufferedRecord.getRecord().equals(HoodieRecord.SENTINEL)) {
       //the record did not match and merge case and should not be modified
-      return Option.of(result);
+      return 
Option.of(existingRecordContext.constructHoodieRecord(resultingBufferedRecord, 
incoming.getPartitionPath()));
     }
 
     //record is inserted or updated
-    String partitionPath = keyGenerator.getPartitionPath((GenericRecord) 
result.getData());
+    String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, 
resultingBufferedRecord);
+    HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(resultingBufferedRecord, 
partitionPath);
     HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema, 
writeSchemaWithMetaFields,
-            new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath),
 config.getProps());
+        new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath),
 config.getProps());
     return 
Option.of(withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
 config.getProps(), Option.empty(),
         config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema)));
+  }
 
+  private static <R> String inferPartitionPath(HoodieRecord<R> incoming, 
HoodieRecord<R> existing, Schema recordSchema, BaseKeyGenerator keyGenerator,
+                                               RecordContext<R> recordContext, 
BufferedRecord<R> resultingBufferedRecord) {
+    R record = resultingBufferedRecord.getRecord();
+    if (record == incoming.getData()) {
+      return incoming.getPartitionPath();
+    } else if (record == existing.getData()) {
+      return existing.getPartitionPath();
+    } else {
+      // the merged record is not the same as either incoming or existing, so 
we need to compute the partition path
+      return 
keyGenerator.getPartitionPath(recordContext.convertToAvroRecord(record, 
recordSchema));
+    }
   }
 
   /**
@@ -444,32 +472,37 @@ public class HoodieIndexUtils {
       HoodieRecord<R> incoming,
       HoodieRecord<R> existing,
       Schema writeSchema,
+      Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
-    Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
-    Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
-    if (expressionPayloadKeygen.isPresent()) {
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames,
+      boolean isExpressionPayload) throws IOException {
+    if (isExpressionPayload) {
       return mergeIncomingWithExistingRecordWithExpressionPayload(incoming, 
existing, writeSchema,
-          existingSchema, writeSchemaWithMetaFields, config, recordMerger, 
expressionPayloadKeygen.get());
+          writeSchemaWithMetaFields, config, recordMerger, keyGenerator, 
incomingRecordContext, existingRecordContext, orderingFieldNames);
     } else {
       // prepend the hoodie meta fields as the incoming record does not have 
them
       HoodieRecord incomingPrepended = incoming
           .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-      // after prepend the meta fields, convert the record back to the 
original payload
-      HoodieRecord incomingWithMetaFields = incomingPrepended
-          .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, 
config.getProps(), Option.empty(), config.allowOperationMetadataField(), 
Option.empty(), false, Option.empty());
-      Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
-          .merge(existing, existingSchema, incomingWithMetaFields, 
writeSchemaWithMetaFields, config.getProps());
-      if (mergeResult.isPresent()) {
-        // the merged record needs to be converted back to the original payload
-        HoodieRecord<R> merged = 
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
-            writeSchemaWithMetaFields, config.getProps(), Option.empty(),
-            config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema));
-        return Option.of(merged);
-      } else {
+      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecord.forRecordWithContext(incomingPrepended, 
writeSchemaWithMetaFields, incomingRecordContext, config.getProps(), 
orderingFieldNames);
+      BufferedRecord<R> existingBufferedRecord = 
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+      MergeResult<R> mergeResult = 
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+
+      if (mergeResult.isDelete()) {
+        // the record was deleted
         return Option.empty();
       }
+      R mergedRecord = mergeResult.getMergedRecord();
+      BufferedRecord<R> resultingBufferedRecord = 
BufferedRecord.forRecordWithContext(mergedRecord, writeSchemaWithMetaFields, 
existingRecordContext,
+          orderingFieldNames, existing.getRecordKey(), mergedRecord == null);
+      String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, 
resultingBufferedRecord);
+      HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(resultingBufferedRecord, 
partitionPath);
+      // the merged record needs to be converted back to the original payload
+      return 
Option.of(result.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
 config.getProps(), Option.empty(),
+          config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema)));
     }
   }
 
@@ -477,10 +510,14 @@ public class HoodieIndexUtils {
    * Merge tagged incoming records with existing records in case of partition 
path updated.
    */
   public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
-      HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations, HoodieWriteConfig config, HoodieTable hoodieTable) 
{
-    Pair<HoodieWriteConfig, Option<BaseKeyGenerator>> 
keyGeneratorWriteConfigOpt = getKeygenAndUpdatedWriteConfig(config, 
hoodieTable.getMetaClient().getTableConfig());
+      HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations,
+      HoodieWriteConfig config,
+      HoodieTable hoodieTable) {
+    boolean isExpressionPayload = 
config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload");
+    Pair<HoodieWriteConfig, BaseKeyGenerator> keyGeneratorWriteConfigOpt =
+        getKeygenAndUpdatedWriteConfig(config, 
hoodieTable.getMetaClient().getTableConfig(), isExpressionPayload);
     HoodieWriteConfig updatedConfig = keyGeneratorWriteConfigOpt.getLeft();
-    Option<BaseKeyGenerator> expressionPayloadKeygen = 
keyGeneratorWriteConfigOpt.getRight();
+    BaseKeyGenerator keyGenerator = keyGeneratorWriteConfigOpt.getRight();
     // completely new records
     HoodieData<HoodieRecord<R>> taggedNewRecords = 
incomingRecordsAndLocations.filter(p -> 
!p.getRight().isPresent()).map(Pair::getLeft);
     // the records found in existing base files
@@ -493,10 +530,38 @@ public class HoodieIndexUtils {
         .filter(p -> p.getRight().isPresent())
         .map(p -> Pair.of(p.getRight().get().getPartitionPath(), 
p.getRight().get().getFileId()))
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+    // define the buffered record merger.
+    ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
+        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+    HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+    RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+    readerContext.initRecordMergerForIngestion(config.getProps());
+    // Create a reader context for the existing records. In the case of 
merge-into commands, the incoming records
+    // can be using an expression payload so here we rely on the table's 
configured payload class if it is required.
+    ReaderContextFactory<R> readerContextFactoryForExistingRecords = 
(ReaderContextFactory<R>) hoodieTable.getContext()
+        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), 
hoodieTable.getMetaClient().getTableConfig().getProps());
+    RecordContext<R> existingRecordContext = 
readerContextFactoryForExistingRecords.getContext().getRecordContext();
     // merged existing records with current locations being set
-    HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(), 
hoodieTable);
-
-    final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+    SerializableSchema writerSchema = new 
SerializableSchema(hoodieTable.getConfig().getWriteSchema());
+    SerializableSchema writerSchemaWithMetaFields = new 
SerializableSchema(HoodieAvroUtils.addMetadataFields(writerSchema.get(), 
updatedConfig.allowOperationMetadataField()));
+    AvroSchemaCache.intern(writerSchema.get());
+    AvroSchemaCache.intern(writerSchemaWithMetaFields.get());
+    // Read the existing records with the meta fields and current writer 
schema as the output schema
+    HoodieData<HoodieRecord<R>> existingRecords =
+        getExistingRecords(globalLocations, 
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, 
readerContextFactoryForExistingRecords, writerSchemaWithMetaFields.get());
+    List<String> orderingFieldNames = getOrderingFieldNames(
+        readerContext.getMergeMode(), hoodieTable.getConfig().getProps(), 
hoodieTable.getMetaClient());
+    BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+        readerContext,
+        readerContext.getMergeMode(),
+        false,
+        readerContext.getRecordMerger(),
+        orderingFieldNames,
+        writerSchema.get(),
+        
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(),
 hoodieTable.getConfig().getPayloadClass())),
+        hoodieTable.getConfig().getProps(),
+        hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
+    String[] orderingFieldsArray = orderingFieldNames.toArray(new String[0]);
     HoodieData<HoodieRecord<R>> taggedUpdatingRecords = 
untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
         .leftOuterJoin(existingRecords.mapToPair(r -> 
Pair.of(r.getRecordKey(), r)))
         .values().flatMap(entry -> {
@@ -507,13 +572,14 @@ public class HoodieIndexUtils {
             return Collections.singletonList(incoming).iterator();
           }
           HoodieRecord<R> existing = existingOpt.get();
-          Schema writeSchema = new 
Schema.Parser().parse(updatedConfig.getWriteSchema());
+          Schema writeSchema = writerSchema.get();
           if (incoming.isDelete(writeSchema, updatedConfig.getProps())) {
             // incoming is a delete: force tag the incoming to the old 
partition
             return 
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()), 
existing.getCurrentLocation())).iterator();
           }
-
-          Option<HoodieRecord<R>> mergedOpt = 
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, updatedConfig, 
recordMerger, expressionPayloadKeygen);
+          Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(
+              incoming, existing, writeSchema, 
writerSchemaWithMetaFields.get(), updatedConfig,
+              recordMerger, keyGenerator, incomingRecordContext, 
existingRecordContext, orderingFieldsArray, isExpressionPayload);
           if (!mergedOpt.isPresent()) {
             // merge resulted in delete: force tag the incoming to the old 
partition
             return 
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()), 
existing.getCurrentLocation())).iterator();
@@ -652,7 +718,7 @@ public class HoodieIndexUtils {
     Schema tableSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
     List<String> sourceFields = new ArrayList<>(columns.keySet());
     String columnName = sourceFields.get(0); // We know there's only one 
column from the check above
-    
+
     // First check if the field exists
     try {
       getNestedFieldSchemaFromWriteSchema(tableSchema, columnName);
@@ -663,7 +729,7 @@ public class HoodieIndexUtils {
           indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : 
"expression",
           userIndexName, columnName));
     }
-    
+
     // Check for complex types (RECORD, ARRAY, MAP) - not supported for any 
index type
     if (!validateDataTypeForSecondaryOrExpressionIndex(sourceFields, 
tableSchema)) {
       Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, 
columnName);
@@ -674,7 +740,7 @@ public class HoodieIndexUtils {
           indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : 
"expression",
           userIndexName, columnName, fieldSchema.getType()));
     }
-    
+
     // For secondary index, apply stricter data type validation
     if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
       if (!validateDataTypeForSecondaryIndex(sourceFields, tableSchema)) {
@@ -683,7 +749,7 @@ public class HoodieIndexUtils {
         if (fieldSchema.getLogicalType() != null) {
           actualType += " with logical type " + fieldSchema.getLogicalType();
         }
-        
+
         throw new HoodieMetadataIndexException(String.format(
             "Cannot create secondary index '%s': Column '%s' has unsupported 
data type '%s'. "
             + "Secondary indexes only support: STRING, CHAR, INT, BIGINT/LONG, 
SMALLINT, TINYINT, "
@@ -691,13 +757,13 @@ public class HoodieIndexUtils {
             + "and DATE types. Please choose a column with one of these 
supported types.",
             userIndexName, columnName, actualType));
       }
-      
+
       // Check if record index is enabled for secondary index
       boolean hasRecordIndex = 
metaClient.getTableConfig().getMetadataPartitions().stream()
           .anyMatch(partition -> 
partition.equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
       boolean recordIndexEnabled = Boolean.parseBoolean(
           options.getOrDefault(RECORD_INDEX_ENABLE_PROP.key(), 
RECORD_INDEX_ENABLE_PROP.defaultValue().toString()));
-      
+
       if (!hasRecordIndex && !recordIndexEnabled) {
         throw new HoodieMetadataIndexException(String.format(
             "Cannot create secondary index '%s': Record index is required for 
secondary indexes but is not enabled. "
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 5d6bb6048e5b..ebb119360367 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -18,19 +18,35 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.avro.AvroSchemaCache;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.function.SerializableFunctionUnchecked;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.List;
+
 public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I> {
 
   protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor) {
@@ -85,9 +101,68 @@ public abstract class BaseWriteHelper<T, I, K, O, R> 
extends ParallelismHelper<I
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
-    HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())
+            .getContext();
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    readerContext.initRecordMergerForIngestion(table.getConfig().getProps());
+    List<String> orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
table.getConfig().getProps(), table.getMetaClient());
+    Schema recordSchema;
+    if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
+    } else {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getWriteSchema());
+    }
+    recordSchema = AvroSchemaCache.intern(recordSchema);
+    BufferedRecordMerger<T> bufferedRecordMerger = 
BufferedRecordMergerFactory.create(
+        readerContext,
+        readerContext.getMergeMode(),
+        false,
+        
readerContext.getRecordMerger().map(HoodieRecordUtils::mergerToPreCombineMode),
+        orderingFieldNames,
+        Option.ofNullable(table.getConfig().getPayloadClass()),
+        recordSchema,
+        table.getConfig().getProps(),
+        tableConfig.getPartialUpdateMode());
+    return deduplicateRecords(
+        records,
+        table.getIndex(),
+        parallelism,
+        table.getConfig().getSchema(),
+        table.getConfig().getProps(),
+        bufferedRecordMerger,
+        readerContext,
+        orderingFieldNames.toArray(new String[0]));
   }
 
-  public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int 
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
+  public abstract I deduplicateRecords(I records,
+                                       HoodieIndex<?, ?> index,
+                                       int parallelism,
+                                       String schema,
+                                       TypedProperties props,
+                                       BufferedRecordMerger<T> merger,
+                                       HoodieReaderContext<T> readerContext,
+                                       String[] orderingFieldNames);
+
+  protected static <T> HoodieRecord<T> reduceRecords(TypedProperties props, 
BufferedRecordMerger<T> recordMerger, String[] orderingFieldNames,
+                                                     HoodieRecord<T> previous, 
HoodieRecord<T> next, Schema schema, RecordContext<T> recordContext) {
+    try {
+      // NOTE: The order of previous and next is uncertain within a batch in 
"reduceByKey".
+      // If the return value is empty, it means the previous should be chosen.
+      BufferedRecord<T> newBufferedRecord = 
BufferedRecord.forRecordWithContext(next, schema, recordContext, props, 
orderingFieldNames);
+      // Construct old buffered record.
+      BufferedRecord<T> oldBufferedRecord = 
BufferedRecord.forRecordWithContext(previous, schema, recordContext, props, 
orderingFieldNames);
+      // Run merge.
+      Option<BufferedRecord<T>> merged = 
recordMerger.deltaMerge(newBufferedRecord, oldBufferedRecord);
+      // NOTE: For merge mode based merging, it returns non-null.
+      //       For mergers / payloads based merging, it may return null.
+      HoodieRecord<T> reducedRecord = merged.map(bufferedRecord -> 
recordContext.constructHoodieRecord(bufferedRecord, 
next.getPartitionPath())).orElse(previous);
+      boolean choosePrevious = merged.isEmpty();
+      HoodieKey reducedKey = choosePrevious ? previous.getKey() : 
next.getKey();
+      HoodieOperation operation = choosePrevious ? previous.getOperation() : 
next.getOperation();
+      return reducedRecord.newInstance(reducedKey, operation);
+    } catch (IOException e) {
+      throw new HoodieException(String.format("Error to merge two records, %s, 
%s", previous, next), e);
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 37bb5b64e3bf..adfedf72db06 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -23,17 +23,15 @@ import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import java.io.IOException;
-
 public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, 
HoodieData<HoodieRecord<T>>,
     HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
 
@@ -56,10 +54,17 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
   }
 
   @Override
-  public HoodieData<HoodieRecord<T>> deduplicateRecords(
-      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger 
merger) {
+  public HoodieData<HoodieRecord<T>> 
deduplicateRecords(HoodieData<HoodieRecord<T>> records,
+                                                        HoodieIndex<?, ?> 
index,
+                                                        int parallelism,
+                                                        String schemaStr,
+                                                        TypedProperties props,
+                                                        
BufferedRecordMerger<T> recordMerger,
+                                                        HoodieReaderContext<T> 
readerContext,
+                                                        String[] 
orderingFieldNames) {
     boolean isIndexingGlobal = index.isGlobal();
     final SerializableSchema schema = new SerializableSchema(schemaStr);
+    RecordContext<T> recordContext = readerContext.getRecordContext();
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their 
partitionPath
@@ -68,17 +73,8 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
       //       Here we have to make a copy of the incoming record, since it 
might be holding
       //       an instance of [[InternalRow]] pointing into shared, mutable 
buffer
       return Pair.of(key, record.copy());
-    }).reduceByKey((rec1, rec2) -> {
-      HoodieRecord<T> reducedRecord;
-      try {
-        reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), 
props).get().getLeft();
-      } catch (IOException e) {
-        throw new HoodieException(String.format("Error to merge two records, 
%s, %s", rec1, rec2), e);
-      }
-      boolean choosePrev = rec1.getData().equals(reducedRecord.getData());
-      HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
-      HoodieOperation operation = choosePrev ? rec1.getOperation() : 
rec2.getOperation();
-      return reducedRecord.newInstance(reducedKey, operation);
-    }, parallelism).map(Pair::getRight);
+    }).reduceByKey(
+        (previous, next) -> reduceRecords(props, recordMerger, 
orderingFieldNames, previous, next, schema.get(), recordContext),
+        parallelism).map(Pair::getRight);
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index c437b9fbf1c0..9268f811a325 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -27,10 +27,12 @@ import 
org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -39,7 +41,6 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -50,6 +51,8 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -93,6 +96,7 @@ import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.jetbrains.annotations.NotNull;
 
@@ -129,6 +133,7 @@ import static 
org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NA
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
 import static 
org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
 import static 
org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
+import static 
org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING;
 import static 
org.apache.hudi.testutils.Assertions.assertNoDupesWithinPartition;
 import static 
org.apache.hudi.testutils.Assertions.assertNoDuplicatesInPartition;
@@ -544,8 +549,34 @@ public abstract class HoodieWriterClientTestHarness 
extends HoodieCommonTestHarn
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(isGlobal);
     int dedupParallelism = records.getNumPartitions() + additionalParallelism;
-    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = 
(HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
-            .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), 
HoodiePreCombineAvroRecordMerger.INSTANCE);
+    BaseHoodieWriteClient writeClient = getHoodieWriteClient(writeConfig);
+    HoodieReaderContext readerContext = writeClient.getEngineContext()
+        .getReaderContextFactoryForWrite(metaClient, 
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
+    List<String> orderingFieldNames = getOrderingFieldNames(
+        readerContext.getMergeMode(), writeClient.getConfig().getProps(), 
metaClient);
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
writeConfig.getPayloadClass(), null,
+        String.join(",", orderingFieldNames), 
metaClient.getTableConfig().getTableVersion()).getLeft();
+    BufferedRecordMerger<HoodieRecord> recordMerger = 
BufferedRecordMergerFactory.create(
+        readerContext,
+        recordMergeMode,
+        false,
+        Option.ofNullable(writeClient.getConfig().getRecordMerger()),
+        orderingFieldNames,
+        Option.ofNullable(writeClient.getConfig().getPayloadClass()),
+        new Schema.Parser().parse(writeClient.getConfig().getSchema()),
+        writeClient.getConfig().getProps(),
+        metaClient.getTableConfig().getPartialUpdateMode());
+    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
+        (HoodieData<HoodieRecord<RawTripTestPayload>>) 
HoodieWriteHelper.newInstance()
+            .deduplicateRecords(
+                records,
+                index,
+                dedupParallelism,
+                writeConfig.getSchema(),
+                writeConfig.getProps(),
+                recordMerger,
+                readerContext,
+                orderingFieldNames.toArray(new String[0]));
     assertEquals(expectedNumPartitions, dedupedRecsRdd.getNumPartitions());
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = 
dedupedRecsRdd.collectAsList();
     assertEquals(isGlobal ? 1 : 2, dedupedRecs.size());
@@ -1187,24 +1218,22 @@ public abstract class HoodieWriterClientTestHarness 
extends HoodieCommonTestHarn
    */
   protected void testUpsertsInternal(Function3<Object, BaseHoodieWriteClient, 
Object, String> writeFn, boolean populateMetaFields, boolean isPrepped,
                                      SupportsUpgradeDowngrade 
upgradeDowngrade) throws Exception {
-
     metaClient.getStorage().deleteDirectory(new StoragePath(basePath));
-
-    metaClient = HoodieTableMetaClient.newTableBuilder()
-        .fromMetaClient(metaClient)
-        .setTableVersion(6)
-        .setPopulateMetaFields(populateMetaFields)
-        .initTable(metaClient.getStorageConf().newInstance(), 
metaClient.getBasePath());
-
     HoodieWriteConfig.Builder cfgBuilder = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true)
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("driver,rider")
             
.withMetadataIndexColumnStatsFileGroupCount(1).withEngineType(getEngineType()).build())
         .withWriteTableVersion(6);
 
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
-    metaClient = HoodieTestUtils.createMetaClient(storageConf, new 
StoragePath(basePath), HoodieTableVersion.SIX);
-
     HoodieWriteConfig config = cfgBuilder.build();
+    metaClient = HoodieTableMetaClient.newTableBuilder()
+        .fromProperties(config.getProps())
+        .setTableVersion(6)
+        .setTableType(metaClient.getTableType())
+        .setPopulateMetaFields(populateMetaFields)
+        .initTable(metaClient.getStorageConf().newInstance(), 
metaClient.getBasePath());
+
+    metaClient = HoodieTestUtils.createMetaClient(storageConf, new 
StoragePath(basePath), HoodieTableVersion.SIX);
     BaseHoodieWriteClient client = getHoodieWriteClient(config);
 
     // Write 1 (only inserts)
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 0b0f1d7586df..13a30b228fde 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -22,13 +22,12 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
 import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
@@ -36,7 +35,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.avro.Schema;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
@@ -92,31 +90,22 @@ public class FlinkWriteHelper<T, R> extends 
BaseWriteHelper<T, Iterator<HoodieRe
   }
 
   @Override
-  public Iterator<HoodieRecord<T>> 
deduplicateRecords(Iterator<HoodieRecord<T>> records, HoodieIndex<?, ?> index, 
int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger 
merger) {
+  public Iterator<HoodieRecord<T>> 
deduplicateRecords(Iterator<HoodieRecord<T>> records,
+                                                      HoodieIndex<?, ?> index,
+                                                      int parallelism,
+                                                      String schemaStr,
+                                                      TypedProperties props,
+                                                      BufferedRecordMerger<T> 
recordMerger,
+                                                      HoodieReaderContext<T> 
readerContext,
+                                                      String[] 
orderingFieldNames) {
     // If index used is global, then records are expected to differ in their 
partitionPath
     Map<Object, List<HoodieRecord<T>>> keyedRecords = 
CollectionUtils.toStream(records)
         .collect(Collectors.groupingBy(record -> 
record.getKey().getRecordKey()));
 
     // caution that the avro schema is not serializable
     final Schema schema = new Schema.Parser().parse(schemaStr);
-    return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, 
rec2) -> {
-      HoodieRecord<T> reducedRecord;
-      try {
-        // Precombine do not need schema and do not return null
-        reducedRecord =  merger.merge(rec1, schema, rec2, schema, 
props).get().getLeft();
-      } catch (IOException e) {
-        throw new HoodieException(String.format("Error to merge two records, 
%s, %s", rec1, rec2), e);
-      }
-      // we cannot allow the user to change the key or partitionPath, since 
that will affect
-      // everything
-      // so pick it from one of the records.
-      boolean choosePrev = rec1.getData() == reducedRecord.getData();
-      HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
-      HoodieOperation operation = choosePrev ? rec1.getOperation() : 
rec2.getOperation();
-      HoodieRecord<T> hoodieRecord = reducedRecord.newInstance(reducedKey, 
operation);
-      // reuse the location from the first record.
-      hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
-      return hoodieRecord;
-    }).orElse(null)).filter(Objects::nonNull).iterator();
+    return keyedRecords.values().stream().map(x -> 
x.stream().reduce((previous, next) ->
+      reduceRecords(props, recordMerger, orderingFieldNames, previous, next, 
schema, readerContext.getRecordContext())
+    ).orElse(null)).filter(Objects::nonNull).iterator();
   }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 1ab944b94feb..efed521190c9 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -22,17 +22,16 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -59,8 +58,14 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, 
List<HoodieRecord<T
   }
 
   @Override
-  public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, 
String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
+  public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> 
records,
+                                                  HoodieIndex<?, ?> index,
+                                                  int parallelism,
+                                                  String schemaStr,
+                                                  TypedProperties props,
+                                                  BufferedRecordMerger<T> 
recordMerger,
+                                                  HoodieReaderContext<T> 
readerContext,
+                                                  String[] orderingFieldNames) 
{
     boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = 
records.stream().map(record -> {
       HoodieKey hoodieKey = record.getKey();
@@ -70,17 +75,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, 
List<HoodieRecord<T
     }).collect(Collectors.groupingBy(Pair::getLeft));
 
     final Schema schema = new Schema.Parser().parse(schemaStr);
-    return keyedRecords.values().stream().map(x -> 
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
-      HoodieRecord<T> reducedRecord;
-      try {
-        reducedRecord =  merger.merge(rec1, schema, rec2, schema, 
props).get().getLeft();
-      } catch (IOException e) {
-        throw new HoodieException(String.format("Error to merge two records, 
%s, %s", rec1, rec2), e);
-      }
-      // we cannot allow the user to change the key or partitionPath, since 
that will affect
-      // everything
-      // so pick it from one of the records.
-      return reducedRecord.newInstance(rec1.getKey(), rec1.getOperation());
-    }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
+    return keyedRecords.values().stream().map(x -> 
x.stream().map(Pair::getRight).reduce((previous, next) ->
+        reduceRecords(props, recordMerger, orderingFieldNames, previous, next, 
schema, readerContext.getRecordContext())
+    ).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index b2119ddf7ce2..3ff31990e7fa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -18,12 +18,11 @@
 
 package org.apache.hudi.table.action.commit;
 
-import org.apache.hudi.client.utils.SparkPartitionUtils;
-import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.client.WriteStatus;
 import 
org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.SparkPartitionUtils;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
@@ -47,8 +46,9 @@ import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
index a26df55c583b..7316c164dce4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
@@ -70,7 +70,7 @@ public abstract class BaseSparkInternalRecordContext extends 
RecordContext<Inter
   }
 
   @Override
-  public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+  public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord, String 
partitionPath) {
     HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
     if (bufferedRecord.isDelete()) {
       return new HoodieEmptyRecord<>(
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
index 9bdfd1be0ee8..178a4c00e55c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
@@ -21,27 +21,41 @@ package org.apache.hudi.avro;
 
 import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.AvroJavaTypeConverter;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.SpillableMapUtils;
+import org.apache.hudi.exception.HoodieException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.IOException;
 import java.util.Map;
+import java.util.Properties;
 
+/**
+ * Record context for reading and transforming avro indexed records.
+ */
 public class AvroRecordContext extends RecordContext<IndexedRecord> {
 
   private final String payloadClass;
+  // This boolean indicates whether the caller requires payloads in the 
HoodieRecord conversion.
+  // This is temporarily required as we migrate away from payloads.
+  private final boolean requiresPayloadRecords;
 
-  public AvroRecordContext(HoodieTableConfig tableConfig) {
+  public AvroRecordContext(HoodieTableConfig tableConfig, String payloadClass, 
boolean requiresPayloadRecords) {
     super(tableConfig);
-    this.payloadClass = tableConfig.getPayloadClass();
+    this.payloadClass = payloadClass;
     this.typeConverter = new AvroJavaTypeConverter();
+    this.requiresPayloadRecords = requiresPayloadRecords;
   }
 
   public static Object getFieldValueFromIndexedRecord(
@@ -79,18 +93,39 @@ public class AvroRecordContext extends 
RecordContext<IndexedRecord> {
   }
 
   @Override
-  public HoodieRecord<IndexedRecord> 
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
+  public HoodieRecord constructHoodieRecord(BufferedRecord<IndexedRecord> 
bufferedRecord, String partitionPath) {
+    // HoodieKey is not required so do not generate it if partitionPath is null
+    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
+
     if (bufferedRecord.isDelete()) {
-      return SpillableMapUtils.generateEmptyPayload(
-          bufferedRecord.getRecordKey(),
-          partitionPath,
-          bufferedRecord.getOrderingValue(),
-          payloadClass);
+      if (payloadClass != null) {
+        return SpillableMapUtils.generateEmptyPayload(
+            bufferedRecord.getRecordKey(),
+            partitionPath,
+            bufferedRecord.getOrderingValue(),
+            payloadClass);
+      } else {
+        return new HoodieEmptyRecord<>(
+            hoodieKey,
+            HoodieRecord.HoodieRecordType.AVRO);
+      }
+    }
+    if (requiresPayloadRecords) {
+      HoodieRecordPayload payload = 
HoodieRecordUtils.loadPayload(payloadClass, (GenericRecord) 
bufferedRecord.getRecord(), bufferedRecord.getOrderingValue());
+      return new HoodieAvroRecord<>(hoodieKey, payload);
     }
-    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
     return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
   }
 
+  @Override
+  public IndexedRecord extractDataFromRecord(HoodieRecord record, Schema 
schema, Properties properties) {
+    try {
+      return record.toIndexedRecord(schema, 
properties).map(HoodieAvroIndexedRecord::getData).orElse(null);
+    } catch (IOException e) {
+      throw new HoodieException("Failed to extract data from record: " + 
record, e);
+    }
+  }
+
   @Override
   public IndexedRecord mergeWithEngineRecord(Schema schema,
                                              Map<Integer, Object> updateValues,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d0a57d113c5c..f376b6c1a7c3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -96,7 +96,34 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
       Option<InstantRange> instantRangeOpt,
       Option<Predicate> filterOpt,
       Map<StoragePath, HoodieAvroFileReader> reusableFileReaders) {
-    super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new 
AvroRecordContext(tableConfig));
+    this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, 
reusableFileReaders, tableConfig.getPayloadClass(), false);
+  }
+
+  /**
+   * Constructs an instance of the reader context for writer workflows
+   *
+   * @param storageConfiguration   the storage configuration to use for 
reading files
+   * @param tableConfig            the configuration of the Hudi table being 
read
+   * @param payloadClassName       the payload class for the writer
+   * @param requiresPayloadRecords indicates whether the caller expects 
payloads as the data in any HoodieRecord returned by this context
+   */
+  public HoodieAvroReaderContext(
+      StorageConfiguration<?> storageConfiguration,
+      HoodieTableConfig tableConfig,
+      String payloadClassName,
+      boolean requiresPayloadRecords) {
+    this(storageConfiguration, tableConfig, Option.empty(), Option.empty(), 
Collections.emptyMap(), payloadClassName, requiresPayloadRecords);
+  }
+
+  private HoodieAvroReaderContext(
+      StorageConfiguration<?> storageConfiguration,
+      HoodieTableConfig tableConfig,
+      Option<InstantRange> instantRangeOpt,
+      Option<Predicate> filterOpt,
+      Map<StoragePath, HoodieAvroFileReader> reusableFileReaders,
+      String payloadClassName,
+      boolean requiresPayloadRecords) {
+    super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new 
AvroRecordContext(tableConfig, payloadClassName, requiresPayloadRecords));
     this.reusableFileReaders = reusableFileReaders;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
index d410e7432910..72a6db27be85 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
@@ -20,7 +20,6 @@ package org.apache.hudi.common.engine;
 
 import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.generic.IndexedRecord;
 
@@ -29,13 +28,21 @@ import org.apache.avro.generic.IndexedRecord;
  */
 public class AvroReaderContextFactory implements 
ReaderContextFactory<IndexedRecord> {
   private final HoodieTableMetaClient metaClient;
+  private final String payloadClassName;
+  private final boolean requiresPayloadRecords;
 
   public AvroReaderContextFactory(HoodieTableMetaClient metaClient) {
+    this(metaClient, metaClient.getTableConfig().getPayloadClass(), false);
+  }
+
+  public AvroReaderContextFactory(HoodieTableMetaClient metaClient, String 
payloadClassName, boolean requiresPayloadRecords) {
     this.metaClient = metaClient;
+    this.payloadClassName = payloadClassName;
+    this.requiresPayloadRecords = requiresPayloadRecords;
   }
 
   @Override
   public HoodieReaderContext<IndexedRecord> getContext() {
-    return new HoodieAvroReaderContext(metaClient.getStorageConf(), 
metaClient.getTableConfig(), Option.empty(), Option.empty());
+    return new HoodieAvroReaderContext(metaClient.getStorageConf(), 
metaClient.getTableConfig(), payloadClassName, requiresPayloadRecords);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 0255bbf67123..5199c2e93e68 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -30,6 +30,7 @@ import 
org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableSortingIterator;
@@ -135,16 +136,26 @@ public abstract class HoodieEngineContext {
 
   public abstract <T> ReaderContextFactory<T> 
getReaderContextFactory(HoodieTableMetaClient metaClient);
 
-  public ReaderContextFactory<?> 
getReaderContextFactoryDuringWrite(HoodieTableMetaClient metaClient, 
HoodieRecord.HoodieRecordType recordType) {
+  /**
+   * Returns reader context factory for write operations in the table.
+   *
+   * @param metaClient Table meta client
+   * @param recordType Record type
+   * @param properties Typed properties
+   */
+  public ReaderContextFactory<?> 
getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient, 
HoodieRecord.HoodieRecordType recordType,
+                                                                 
TypedProperties properties) {
     if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
-      return new AvroReaderContextFactory(metaClient);
+      String payloadClass = ConfigUtils.getPayloadClass(properties);
+      return new AvroReaderContextFactory(metaClient, payloadClass, true);
     }
     return getDefaultContextFactory(metaClient);
   }
 
   /**
    * Returns default reader context factory for the engine.
-   * @param metaClient Table metadata client
+   *
+   * @param metaClient          Table metadata client
    */
   public abstract ReaderContextFactory<?> 
getDefaultContextFactory(HoodieTableMetaClient metaClient);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 56e6b763e13a..05699907bbfe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.serialization.CustomSerializer;
 import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -240,12 +241,33 @@ public abstract class HoodieReaderContext<T> {
    * @param properties the properties for the reader.
    */
   public void initRecordMerger(TypedProperties properties) {
+    initRecordMerger(properties, false);
+  }
+
+  public void initRecordMergerForIngestion(TypedProperties properties) {
+    initRecordMerger(properties, true);
+  }
+
+  /**
+   * Initializes the record merger based on the table configuration and 
properties.
+   * @param properties the properties for the reader.
+   * @param isIngestion indicates if the context is used in ingestion path.
+   */
+  private void initRecordMerger(TypedProperties properties, boolean 
isIngestion) {
+    Option<String> providedPayloadClass = 
HoodieRecordPayload.getPayloadClassNameIfPresent(properties);
     RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
     String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
-    if 
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+    HoodieTableVersion tableVersion = tableConfig.getTableVersion();
+    // If the provided payload class differs from the table's payload class, 
we need to infer the correct merging behavior.
+    if (isIngestion && providedPayloadClass.map(className -> 
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
+      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(), 
null,
+          tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
+      recordMergeMode = triple.getLeft();
+      mergeStrategyId = triple.getRight();
+    } else if (!tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
       Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(
           recordMergeMode, tableConfig.getPayloadClass(),
-          mergeStrategyId, null, tableConfig.getTableVersion());
+          mergeStrategyId, tableConfig.getPreCombineFieldsStr().orElse(null), 
tableVersion);
       recordMergeMode = triple.getLeft();
       mergeStrategyId = triple.getRight();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java
index 0e274c49c0d7..bcc10e46072f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java
@@ -42,29 +42,31 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.function.BiFunction;
 
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 
+/**
+ * Record context provides the APIs for record related operations. Record 
context is associated with
+ * a corresponding {@link HoodieReaderContext} and is used for getting field 
values from a record,
+ * transforming a record etc.
+ */
 public abstract class RecordContext<T> implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  private SerializableBiFunction<T, Schema, String> recordKeyExtractor;
+  private final SerializableBiFunction<T, Schema, String> recordKeyExtractor;
   // for encoding and decoding schemas to the spillable map
   private final LocalAvroSchemaCache localAvroSchemaCache = 
LocalAvroSchemaCache.getInstance();
 
   protected JavaTypeConverter typeConverter;
   protected String partitionPath;
 
-  public RecordContext(HoodieTableConfig tableConfig) {
+  protected RecordContext(HoodieTableConfig tableConfig) {
     this.typeConverter = new DefaultJavaTypeConverter();
-    updateRecordKeyExtractor(tableConfig, tableConfig.populateMetaFields());
-  }
-
-  public void updateRecordKeyExtractor(HoodieTableConfig tableConfig, boolean 
shouldUseMetadataFields) {
-    this.recordKeyExtractor = shouldUseMetadataFields ? metadataKeyExtractor() 
: virtualKeyExtractor(tableConfig.getRecordKeyFields()
+    this.recordKeyExtractor = tableConfig.populateMetaFields() ? 
metadataKeyExtractor() : virtualKeyExtractor(tableConfig.getRecordKeyFields()
         .orElseThrow(() -> new IllegalArgumentException("No record keys 
specified and meta fields are not populated")));
   }
 
@@ -72,6 +74,10 @@ public abstract class RecordContext<T> implements 
Serializable {
     this.partitionPath = partitionPath;
   }
 
+  public T extractDataFromRecord(HoodieRecord record, Schema schema, 
Properties properties) {
+    return (T) record.getData();
+  }
+
   /**
    * Gets the schema encoded in the buffered record {@code BufferedRecord}.
    *
@@ -98,13 +104,25 @@ public abstract class RecordContext<T> implements 
Serializable {
     return this.localAvroSchemaCache.getSchema((Integer) 
versionId).orElse(null);
   }
 
+  /**
+   * Constructs a new {@link HoodieRecord} based on the given buffered record 
{@link BufferedRecord} and the provided partition path.
+   * Use this method when the partition path is not consistent for all usages 
of the RecordContext instance.
+   *
+   * @param bufferedRecord The {@link BufferedRecord} object with 
engine-specific row
+   * @param partitionPath The partition path of the record
+   * @return A new instance of {@link HoodieRecord}.
+   */
+  public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> 
bufferedRecord, String partitionPath);
+
   /**
    * Constructs a new {@link HoodieRecord} based on the given buffered record 
{@link BufferedRecord}.
    *
-   * @param bufferedRecord  The {@link BufferedRecord} object with 
engine-specific row
+   * @param bufferedRecord The {@link BufferedRecord} object with 
engine-specific row
    * @return A new instance of {@link HoodieRecord}.
    */
-  public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> 
bufferedRecord);
+  public HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> 
bufferedRecord) {
+    return constructHoodieRecord(bufferedRecord, partitionPath);
+  }
 
   /**
    * Constructs a new Engine based record based on a given schema, base record 
and update values.
@@ -272,6 +290,28 @@ public abstract class RecordContext<T> implements 
Serializable {
     });
   }
 
+  /**
+   * Gets the ordering value in particular type.
+   *
+   * @param record             An option of record.
+   * @param schema             The Avro schema of the record.
+   * @param orderingFieldNames names of the ordering fields
+   * @return The ordering value.
+   */
+  public Comparable getOrderingValue(T record,
+                                     Schema schema,
+                                     String[] orderingFieldNames) {
+    if (orderingFieldNames.length == 0) {
+      return OrderingValues.getDefault();
+    }
+
+    return OrderingValues.create(orderingFieldNames, field -> {
+      Object value = getValue(record, schema, field);
+      // API getDefaultOrderingValue is only used inside Comparables 
constructor
+      return value != null ? convertValueToEngineType((Comparable) value) : 
OrderingValues.getDefault();
+    });
+  }
+
   /**
    * Extracts the record position value from the record itself.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
index 4bea348c3b1b..676d8f3ac8f8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
@@ -19,11 +19,14 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Properties;
 
@@ -98,4 +101,11 @@ public abstract class BaseAvroPayload implements 
Serializable {
   public byte[] getRecordBytes() {
     return recordBytes;
   }
+
+  public Option<IndexedRecord> getIndexedRecord(Schema schema, Properties 
properties) throws IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+    return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 1f8f1f2fa624..1b6bfa03153d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -244,7 +244,7 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
 
   @Override
   public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, 
Properties props) throws IOException {
-    Option<IndexedRecord> avroData = getData().getInsertValue(recordSchema, 
props);
+    Option<IndexedRecord> avroData = getData().getIndexedRecord(recordSchema, 
props);
     if (avroData.isPresent()) {
       HoodieAvroIndexedRecord record =
           new HoodieAvroIndexedRecord(key, avroData.get(), operation, 
getData().getMetadata());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index e9be41e3fe87..673f1a61a0cf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -44,7 +44,7 @@ public class HoodieAvroRecordMerger implements 
HoodieRecordMerger, OperationMode
 
   @Override
   public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
-    return combineAndGetUpdateValue(older, newer, newSchema, props)
+    return combineAndGetUpdateValue(older, newer, oldSchema, newSchema, props)
         .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
   }
 
@@ -53,13 +53,13 @@ public class HoodieAvroRecordMerger implements 
HoodieRecordMerger, OperationMode
     return HoodieRecordType.AVRO;
   }
 
-  private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException {
-    Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, 
props).map(HoodieAvroIndexedRecord::getData);
+  private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema oldSchema, Schema newSchema, Properties props) 
throws IOException {
+    Option<IndexedRecord> previousAvroData = older.toIndexedRecord(oldSchema, 
props).map(HoodieAvroIndexedRecord::getData);
     if (!previousAvroData.isPresent()) {
       return Option.empty();
     }
 
-    return ((HoodieAvroRecord) 
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, 
props);
+    return ((HoodieAvroRecord) 
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), newSchema, 
props);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 3cc8bcf3da08..4cc2a493df17 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -112,6 +112,19 @@ public interface HoodieRecordPayload<T extends 
HoodieRecordPayload> extends Seri
   @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
   Option<IndexedRecord> getInsertValue(Schema schema) throws IOException;
 
+  /**
+   * Deserializes the HoodieRecordPayload into an {@link IndexedRecord}.
+   * Unlike {@link #getInsertValue(Schema, Properties)}, this method is meant 
to solely perform deserialization.
+   *
+   * @param schema     Schema to use for reading the record
+   * @param properties Properties for the current context
+   * @return the {@link IndexedRecord} if one is available, otherwise returns 
an empty Option.
+   * @throws IOException thrown if there is an error during deserialization
+   */
+  default Option<IndexedRecord> getIndexedRecord(Schema schema, Properties 
properties) throws IOException {
+    return getInsertValue(schema, properties);
+  }
+
   /**
    * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a new value for the given
    * HoodieKey, wherein there is no existing record in storage to be combined 
against. (i.e insert) Return EMPTY to skip writing this record.
@@ -169,16 +182,18 @@ public interface HoodieRecordPayload<T extends 
HoodieRecordPayload> extends Seri
   }
 
   static String getPayloadClassName(Properties props) {
-    String payloadClassName;
+    return 
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME);
+  }
+
+  static Option<String> getPayloadClassNameIfPresent(Properties props) {
+    String payloadClassName = null;
     if (props.containsKey(PAYLOAD_CLASS_NAME.key())) {
       payloadClassName = props.getProperty(PAYLOAD_CLASS_NAME.key());
     } else if (props.containsKey("hoodie.datasource.write.payload.class")) {
       payloadClassName = 
props.getProperty("hoodie.datasource.write.payload.class");
-    } else {
-      return HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME;
     }
     // There could be tables written with payload class from com.uber.hoodie.
     // Need to transparently change to org.apache.hudi.
-    return payloadClassName.replace("com.uber.hoodie", "org.apache.hudi");
+    return Option.ofNullable(payloadClassName).map(className -> 
className.replace("com.uber.hoodie", "org.apache.hudi"));
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index 5f205431b24f..d8375e8df67d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -54,9 +54,10 @@ public class BufferedRecord<T> implements Serializable {
     this.isDelete = isDelete;
   }
 
-  public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> 
record, Schema schema, RecordContext<T> recordContext, Properties props, 
String[] orderingFields) {
+  public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord 
record, Schema schema, RecordContext<T> recordContext, Properties props, 
String[] orderingFields) {
     HoodieKey hoodieKey = record.getKey();
-    String recordKey = hoodieKey == null ? 
recordContext.getRecordKey(record.getData(), schema) : hoodieKey.getRecordKey();
+    T data = recordContext.extractDataFromRecord(record, schema, props);
+    String recordKey = hoodieKey == null ? recordContext.getRecordKey(data, 
schema) : hoodieKey.getRecordKey();
     Integer schemaId = recordContext.encodeAvroSchema(schema);
     boolean isDelete;
     try {
@@ -64,7 +65,7 @@ public class BufferedRecord<T> implements Serializable {
     } catch (IOException e) {
       throw new HoodieException("Failed to get isDelete from record.", e);
     }
-    return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, 
props, orderingFields), record.getData(), schemaId, isDelete);
+    return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, 
props, orderingFields), data, schemaId, isDelete);
   }
 
   public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema 
schema, RecordContext<T> recordContext, List<String> orderingFieldNames, 
boolean isDelete) {
@@ -74,6 +75,12 @@ public class BufferedRecord<T> implements Serializable {
     return new BufferedRecord<>(recordKey, orderingValue, record, schemaId, 
isDelete);
   }
 
+  public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema 
schema, RecordContext<T> recordContext, String[] orderingFieldNames, String 
recordKey, boolean isDelete) {
+    Integer schemaId = recordContext.encodeAvroSchema(schema);
+    Comparable orderingValue = recordContext.getOrderingValue(record, schema, 
orderingFieldNames);
+    return new BufferedRecord<>(recordKey, orderingValue, record, schemaId, 
isDelete);
+  }
+
   public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord 
deleteRecord, Comparable orderingValue) {
     return new BufferedRecord<>(deleteRecord.getRecordKey(), orderingValue, 
null, null, true);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 46feee0e67e6..0c949806ba16 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -57,6 +57,19 @@ public class BufferedRecordMergerFactory {
                                                    Schema readerSchema,
                                                    TypedProperties props,
                                                    PartialUpdateMode 
partialUpdateMode) {
+    return create(readerContext, recordMergeMode, enablePartialMerging, 
recordMerger,
+        orderingFieldNames, readerSchema, payloadClass.map(p -> Pair.of(p, 
p)), props, partialUpdateMode);
+  }
+
+  public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> 
readerContext,
+                                                   RecordMergeMode 
recordMergeMode,
+                                                   boolean 
enablePartialMerging,
+                                                   Option<HoodieRecordMerger> 
recordMerger,
+                                                   List<String> 
orderingFieldNames,
+                                                   Schema readerSchema,
+                                                   Option<Pair<String, 
String>> payloadClasses,
+                                                   TypedProperties props,
+                                                   PartialUpdateMode 
partialUpdateMode) {
     /**
      * This part implements KEEP_VALUES partial update mode, which merges two 
records that do not have all columns.
      * Other Partial update modes, like IGNORE_DEFAULTS assume all columns 
exists in the record,
@@ -64,7 +77,7 @@ public class BufferedRecordMergerFactory {
      */
     if (enablePartialMerging) {
       BufferedRecordMerger<T> deleteRecordMerger = create(
-          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldNames, payloadClass, readerSchema, props, partialUpdateMode);
+          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateMode);
       return new 
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
     }
 
@@ -80,9 +93,13 @@ public class BufferedRecordMergerFactory {
         }
         return new 
EventTimePartiaRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateMode, props);
       default:
-        if (payloadClass.isPresent()) {
-          return new CustomPayloadRecordMerger<>(
-              readerContext.getRecordContext(), recordMerger, 
orderingFieldNames, payloadClass.get(), readerSchema, props);
+        if (payloadClasses.isPresent()) {
+          if 
(payloadClasses.get().getRight().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
 {
+            return new 
ExpressionPayloadRecordMerger<>(readerContext.getRecordContext(), recordMerger, 
orderingFieldNames,
+                payloadClasses.get().getLeft(), 
payloadClasses.get().getRight(), readerSchema, props);
+          } else {
+            return new 
CustomPayloadRecordMerger<>(readerContext.getRecordContext(), recordMerger, 
orderingFieldNames, payloadClasses.get().getLeft(), readerSchema, props);
+          }
         } else {
           return new CustomRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, orderingFieldNames, readerSchema, props);
         }
@@ -389,13 +406,35 @@ public class BufferedRecordMergerFactory {
     }
   }
 
+  /**
+   * An implementation of {@link BufferedRecordMerger} which merges {@link 
BufferedRecord}s based on the ExpressionPayload.
+   * The delta merge expects the incoming records to both be Expression 
Payload, whereas the final merge expects the existing
+   * record payload to match the table's configured payload and the new record 
to be an Expression Payload.
+   */
+  private static class ExpressionPayloadRecordMerger<T> extends 
CustomPayloadRecordMerger<T> {
+    private final String basePayloadClass;
+
+    public ExpressionPayloadRecordMerger(RecordContext<T> recordContext, 
Option<HoodieRecordMerger> recordMerger, List<String> orderingFieldNames, 
String basePayloadClass, String incomingPayloadClass,
+                                         Schema readerSchema, TypedProperties 
props) {
+      super(recordContext, recordMerger, orderingFieldNames, 
incomingPayloadClass, readerSchema, props);
+      this.basePayloadClass = basePayloadClass;
+    }
+
+    @Override
+    protected Pair<HoodieRecord, HoodieRecord> 
getFinalMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord) {
+      HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext, 
olderRecord, basePayloadClass);
+      HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext, 
newerRecord, payloadClass);
+      return Pair.of(oldHoodieRecord, newHoodieRecord);
+    }
+  }
+
   /**
    * An implementation of {@link BufferedRecordMerger} which merges {@link 
BufferedRecord}s
    * based on {@code CUSTOM} merge mode and a given record payload class.
    */
   private static class CustomPayloadRecordMerger<T> extends 
BaseCustomMerger<T> {
-    private final List<String> orderingFieldNames;
-    private final String payloadClass;
+    private final String[] orderingFieldNames;
+    protected final String payloadClass;
 
     public CustomPayloadRecordMerger(
         RecordContext<T> recordContext,
@@ -405,53 +444,73 @@ public class BufferedRecordMergerFactory {
         Schema readerSchema,
         TypedProperties props) {
       super(recordContext, recordMerger, readerSchema, props);
-      this.orderingFieldNames = orderingFieldNames;
+      this.orderingFieldNames = orderingFieldNames.toArray(new String[0]);
       this.payloadClass = payloadClass;
     }
 
     @Override
     public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
getMergedRecord(existingRecord, newRecord);
-      if (combinedRecordAndSchemaOpt.isPresent()) {
-        T combinedRecordData = recordContext.convertAvroRecord((IndexedRecord) 
combinedRecordAndSchemaOpt.get().getLeft().getData());
-        // If pre-combine does not return existing record, update it
-        if (combinedRecordData != existingRecord.getRecord()) {
-          Pair<HoodieRecord, Schema> combinedRecordAndSchema = 
combinedRecordAndSchemaOpt.get();
-          return 
Option.of(BufferedRecord.forRecordWithContext(combinedRecordData, 
combinedRecordAndSchema.getRight(), recordContext, orderingFieldNames, false));
-        }
+      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
+      if (mergedRecordAndSchema.isEmpty()) {
+        // An empty Option indicates that the output represents a delete.
+        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, true));
+      }
+      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+      // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
+      if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
         return Option.empty();
       }
-      // An empty Option indicates that the output represents a delete.
-      return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, true));
+      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+      // If pre-combine does not return existing record, update it
+      if (combinedRecordData != existingRecord.getRecord()) {
+        // For pkless we need to use record key from existing record
+        return 
Option.of(BufferedRecord.forRecordWithContext(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
+            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+      }
+      return Option.empty();
     }
 
     @Override
     public MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecord =
-          getMergedRecord(olderRecord, newerRecord);
-      if (mergedRecord.isPresent()
-          && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
+      if (mergedRecordAndSchema.isEmpty()) {
+        return new MergeResult<>(true, null);
+      }
+      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+      // Special handling for SENTINEL record in Expression Payload
+      if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
+        return new MergeResult<>(false, null);
+      }
+      if (!mergedRecord.isDelete(mergeResultSchema, props)) {
         IndexedRecord indexedRecord;
-        if (!mergedRecord.get().getRight().equals(readerSchema)) {
-          indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData();
+        if (!mergeResultSchema.equals(readerSchema)) {
+          indexedRecord = (IndexedRecord) 
mergedRecord.rewriteRecordWithNewSchema(mergeResultSchema, null, 
readerSchema).getData();
         } else {
-          indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().getData();
+          indexedRecord = (IndexedRecord) mergedRecord.getData();
         }
         return new MergeResult<>(false, 
recordContext.convertAvroRecord(indexedRecord));
       }
       return new MergeResult<>(true, null);
     }
 
-    private Option<Pair<HoodieRecord, Schema>> 
getMergedRecord(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord) 
throws IOException {
-      HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext, 
olderRecord);
-      HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext, 
newerRecord);
-      Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
-          oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord, 
olderRecord),
-          newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord, 
newerRecord), props);
-      return mergedRecord;
+    protected Pair<HoodieRecord, HoodieRecord> 
getDeltaMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord) {
+      HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext, 
olderRecord, payloadClass);
+      HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext, 
newerRecord, payloadClass);
+      return Pair.of(oldHoodieRecord, newHoodieRecord);
+    }
+
+    protected Pair<HoodieRecord, HoodieRecord> 
getFinalMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord) {
+      return getDeltaMergeRecords(olderRecord, newerRecord);
+    }
+
+    private Option<Pair<HoodieRecord, Schema>> 
getMergedRecord(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord, 
boolean isFinalMerge) throws IOException {
+      Pair<HoodieRecord, HoodieRecord> records = isFinalMerge ? 
getFinalMergeRecords(olderRecord, newerRecord) : 
getDeltaMergeRecords(olderRecord, newerRecord);
+      return recordMerger.merge(records.getLeft(), 
getSchemaForAvroPayloadMerge(olderRecord), records.getRight(), 
getSchemaForAvroPayloadMerge(newerRecord), props);
     }
 
-    private HoodieRecord constructHoodieAvroRecord(RecordContext<T> 
recordContext, BufferedRecord<T> bufferedRecord) {
+    protected HoodieRecord constructHoodieAvroRecord(RecordContext<T> 
recordContext, BufferedRecord<T> bufferedRecord, String payloadClass) {
       GenericRecord record = null;
       if (!bufferedRecord.isDelete()) {
         Schema recordSchema = 
recordContext.getSchemaFromBufferRecord(bufferedRecord);
@@ -462,8 +521,8 @@ public class BufferedRecordMergerFactory {
           HoodieRecordUtils.loadPayload(payloadClass, record, 
bufferedRecord.getOrderingValue()), null);
     }
 
-    private Schema getSchemaForAvroPayloadMerge(HoodieRecord record, 
BufferedRecord<T> bufferedRecord) throws IOException {
-      if (record.isDelete(readerSchema, props)) {
+    private Schema getSchemaForAvroPayloadMerge(BufferedRecord<T> 
bufferedRecord) {
+      if (bufferedRecord.getSchemaId() == null) {
         return readerSchema;
       }
       return recordContext.getSchemaFromBufferRecord(bufferedRecord);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 2a57d8b87376..c756c6fc6ba2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -20,7 +20,6 @@
 package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.BaseFile;
@@ -35,6 +34,7 @@ import 
org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
 import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Either;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -52,7 +52,6 @@ import org.apache.avro.Schema;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -119,9 +118,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props)
         : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props));
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
-    this.orderingFieldNames = readerContext.getMergeMode() == 
RecordMergeMode.COMMIT_TIME_ORDERING
-        ? Collections.emptyList()
-        : 
Option.ofNullable(ConfigUtils.getOrderingFields(props)).map(Arrays::asList).orElse(hoodieTableMetaClient.getTableConfig().getPreCombineFields());
+    this.orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), props, 
hoodieTableMetaClient);
     this.readStats = new HoodieReadStats();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 6d3c3b9eadca..dd0eccb32e30 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -25,6 +27,7 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.OperationModeAwareness;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 
@@ -32,6 +35,8 @@ import org.apache.avro.generic.GenericRecord;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -115,4 +120,12 @@ public class HoodieRecordUtils {
     }
     return null;
   }
+
+  public static List<String> getOrderingFieldNames(RecordMergeMode mergeMode,
+                                                   TypedProperties props,
+                                                   HoodieTableMetaClient 
metaClient) {
+    return mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING
+        ? Collections.emptyList()
+        : 
Option.ofNullable(ConfigUtils.getOrderingFields(props)).map(Arrays::asList).orElseGet(()
 -> metaClient.getTableConfig().getPreCombineFields());
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
index e9cae5045a7a..37837bd070d2 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
@@ -337,7 +337,7 @@ public abstract class SchemaHandlerTestBase {
         }
 
         @Override
-        public HoodieRecord<String> 
constructHoodieRecord(BufferedRecord<String> bufferedRecord) {
+        public HoodieRecord<String> 
constructHoodieRecord(BufferedRecord<String> bufferedRecord, String 
partitionPath) {
           return null;
         }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
index a7ba45b29c37..b5778e57062b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -53,6 +53,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -190,6 +191,11 @@ public class BaseTestFileGroupRecordBuffer {
       return Option.of(payloadRecord);
     }
 
+    @Override
+    public Option<IndexedRecord> getIndexedRecord(Schema schema, Properties 
properties) {
+      return Option.of(payloadRecord);
+    }
+
     @Override
     public Comparable<?> getOrderingValue() {
       return null;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
index 416a539f1201..1a772005321c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
@@ -63,6 +63,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -195,7 +196,7 @@ class TestFileGroupRecordBuffer {
     record.put("ts", System.currentTimeMillis());
     record.put("op", "d");
     record.put("_hoodie_is_deleted", false);
-    when(recordContext.getOrderingValue(any(), any(), any())).thenReturn(1);
+    when(recordContext.getOrderingValue(any(), any(), 
anyList())).thenReturn(1);
     when(recordContext.convertOrderingValueToEngineType(any())).thenReturn(1);
     BufferedRecord<GenericRecord> bufferedRecord = 
BufferedRecord.forRecordWithContext(record, schema, 
readerContext.getRecordContext(), Collections.singletonList("ts"), true);
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
index 8ece244bda31..d35d66ffa423 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
@@ -47,6 +47,7 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -76,7 +77,7 @@ class TestReusableKeyBasedRecordBuffer {
       return new TestRecord(recordKey, 0);
     });
     when(mockReaderContext.getRecordContext().getRecordKey(any(), 
any())).thenAnswer(invocation -> ((TestRecord) 
invocation.getArgument(0)).getRecordKey());
-    when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(), 
any())).thenAnswer(invocation -> {
+    when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(), 
anyList())).thenAnswer(invocation -> {
       TestRecord record = invocation.getArgument(0);
       if (record.getRecordKey().equals("1")) {
         return 20; // simulate newer record in base file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 0ef3da31cc1e..5ec30d04da7e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -56,6 +56,7 @@ import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -194,7 +195,7 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuf
       return new TestRecord(recordKey, 0);
     });
     when(mockReaderContext.getRecordContext().getRecordKey(any(), 
any())).thenAnswer(invocation -> ((TestRecord) 
invocation.getArgument(0)).getRecordKey());
-    when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(), 
any())).thenReturn(0);
+    when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(), 
anyList())).thenReturn(0);
     when(mockReaderContext.toBinaryRow(any(), any())).thenAnswer(invocation -> 
invocation.getArgument(1));
     when(mockReaderContext.seal(any())).thenAnswer(invocation -> 
invocation.getArgument(0));
     HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index bb671a8f5ef4..18f84084b034 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValues;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.HoodieInstantWriter;
@@ -85,7 +86,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -402,7 +402,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   private RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String 
instantTime) throws IOException {
     GenericRecord rec = generateGenericRecord(key.getRecordKey(), 
key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0,
         true, false);
-    return new RawTripTestPayload(Option.of(rec.toString()), 
key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0L);
+    return new RawTripTestPayload(Option.of(rec.toString()), 
key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0);
   }
 
   /**
@@ -914,7 +914,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
 
   public HoodieRecord generateDeleteRecord(HoodieKey key) throws IOException {
     RawTripTestPayload payload =
-        new RawTripTestPayload(Option.empty(), key.getRecordKey(), 
key.getPartitionPath(), null, true, DEFAULT_ORDERING_VALUE);
+        new RawTripTestPayload(Option.empty(), key.getRecordKey(), 
key.getPartitionPath(), null, true, OrderingValues.getDefault());
     return new HoodieAvroRecord(key, payload);
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index 17b88c813683..9d081015cf1e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -88,7 +88,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
   }
 
   public RawTripTestPayload(String jsonData, String rowKey, String 
partitionPath, String schemaStr) throws IOException {
-    this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
+    this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0);
   }
 
   public RawTripTestPayload(String jsonData) throws IOException {
@@ -98,7 +98,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
     this.rowKey = jsonRecordMap.get("_row_key").toString();
     this.partitionPath = 
extractPartitionFromTimeField(jsonRecordMap.get("time").toString());
     this.isDeleted = false;
-    this.orderingVal = Integer.valueOf(jsonRecordMap.getOrDefault("number", 
0L).toString());
+    this.orderingVal = Integer.valueOf(jsonRecordMap.getOrDefault("number", 
0).toString());
   }
 
   public RawTripTestPayload(GenericRecord record, Comparable orderingVal) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
index 466486158076..96a299c9e6ab 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
@@ -18,16 +18,25 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.exception.HoodieException;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 class TestHoodieRecordUtils {
 
@@ -52,4 +61,22 @@ class TestHoodieRecordUtils {
     HoodieRecordPayload payload = 
HoodieRecordUtils.loadPayload(payloadClassName, null, 0);
     assertEquals(payload.getClass().getName(), payloadClassName);
   }
+
+  @Test
+  void testGetOrderingFields() {
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    TypedProperties props = new TypedProperties();
+    // Assert empty ordering fields for commit time ordering
+    
assertTrue(HoodieRecordUtils.getOrderingFieldNames(RecordMergeMode.COMMIT_TIME_ORDERING,
 props, metaClient).isEmpty());
+
+    // Assert table config precombine fields are returned when props are not 
set with event time merge mode
+    HoodieTableConfig tableConfig = new HoodieTableConfig();
+    tableConfig.setValue(HoodieTableConfig.PRECOMBINE_FIELDS, "tbl");
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    assertEquals(Collections.singletonList("tbl"), 
HoodieRecordUtils.getOrderingFieldNames(RecordMergeMode.EVENT_TIME_ORDERING, 
props, metaClient));
+
+    // Assert props value is returned for precombine field configuration when 
it is set with event time merge mode
+    props.setProperty("hoodie.datasource.write.precombine.field", "props");
+    assertEquals(Collections.singletonList("props"), 
HoodieRecordUtils.getOrderingFieldNames(RecordMergeMode.EVENT_TIME_ORDERING, 
props, metaClient));
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index d989e2ec6338..8058eb19604e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -20,11 +20,13 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.MappingIterator;
@@ -46,6 +48,7 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.MutableIteratorWrapperIterator;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -70,6 +73,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
+import static 
org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames;
+
 /**
  * Sink function to write the data to the underneath filesystem.
  *
@@ -117,7 +122,9 @@ public class StreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlink
 
   protected transient WriteFunction writeFunction;
 
-  private transient HoodieRecordMerger recordMerger;
+  private transient BufferedRecordMerger<RowData> recordMerger;
+  private transient HoodieReaderContext<RowData> readerContext;
+  private transient List<String> orderingFieldNames;
 
   protected final RowType rowType;
 
@@ -226,7 +233,18 @@ public class StreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlink
   }
 
   private void initMergeClass() {
-    recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
+    readerContext = 
writeClient.getEngineContext().<RowData>getReaderContextFactory(metaClient).getContext();
+    orderingFieldNames = getOrderingFieldNames(readerContext.getMergeMode(), 
writeClient.getConfig().getProps(), metaClient);
+    recordMerger = BufferedRecordMergerFactory.create(
+        readerContext,
+        writeClient.getConfig().getRecordMergeMode(),
+        false,
+        Option.ofNullable(writeClient.getConfig().getRecordMerger()),
+        orderingFieldNames,
+        Option.ofNullable(writeClient.getConfig().getPayloadClass()),
+        new Schema.Parser().parse(writeClient.getConfig().getSchema()),
+        writeClient.getConfig().getProps(),
+        metaClient.getTableConfig().getPartialUpdateMode());
     LOG.info("init hoodie merge with class [{}]", 
recordMerger.getClass().getName());
   }
 
@@ -416,7 +434,8 @@ public class StreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlink
     Iterator<HoodieRecord> recordItr = new MappingIterator<>(
         rowItr, rowData -> recordConverter.convert(rowData, 
rowDataBucket.getBucketInfo()));
 
-    List<WriteStatus> statuses = 
writeFunction.write(deduplicateRecordsIfNeeded(recordItr), 
rowDataBucket.getBucketInfo(), instant);
+    List<WriteStatus> statuses = writeFunction.write(
+        deduplicateRecordsIfNeeded(recordItr), rowDataBucket.getBucketInfo(), 
instant);
     writeMetrics.endFileFlush();
     writeMetrics.increaseNumOfFilesWritten();
     return statuses;
@@ -425,7 +444,9 @@ public class StreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlink
   protected Iterator<HoodieRecord> 
deduplicateRecordsIfNeeded(Iterator<HoodieRecord> records) {
     if (config.get(FlinkOptions.PRE_COMBINE)) {
       return FlinkWriteHelper.newInstance().deduplicateRecords(
-          records, null, -1, this.writeClient.getConfig().getSchema(), 
this.writeClient.getConfig().getProps(), recordMerger);
+          records, null, -1, this.writeClient.getConfig().getSchema(),
+          this.writeClient.getConfig().getProps(),
+          recordMerger, readerContext, orderingFieldNames.toArray(new 
String[0]));
     } else {
       return records;
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index c293d8b62b26..3ea434d2d3aa 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -114,7 +114,7 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
   }
 
   @Override
-  public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> 
bufferedRecord) {
+  public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> 
bufferedRecord, String partitionPath) {
     HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
     // delete record
     if (bufferedRecord.isDelete()) {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
index 54488c136044..b31e62abef5f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
@@ -71,7 +71,7 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
   }
 
   @Override
-  public HoodieRecord<ArrayWritable> 
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord) {
+  public HoodieRecord<ArrayWritable> 
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord, String 
partitionPath) {
     HoodieKey key = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
     if (bufferedRecord.isDelete()) {
       return new HoodieEmptyRecord<>(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 4b5865afff13..0b86ac6e3bf9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.log.InstantRange.RangeType
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.common.util.collection.ClosableIterator
+import org.apache.hudi.expression.{Predicate => HPredicate}
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
 import org.apache.hudi.metadata.HoodieTableMetadataUtil
@@ -166,7 +167,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
         if (metaClient.isMetadataTable) {
           val requestedSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
           val instantRange = 
InstantRange.builder().rangeType(RangeType.EXACT_MATCH).explicitInstants(validInstants.value).build()
-          val readerContext = new HoodieAvroReaderContext(storageConf, 
metaClient.getTableConfig, HOption.of(instantRange), HOption.empty())
+          val readerContext = new HoodieAvroReaderContext(storageConf, 
metaClient.getTableConfig, HOption.of(instantRange), 
HOption.empty().asInstanceOf[HOption[HPredicate]])
           val fileGroupReader: HoodieFileGroupReader[IndexedRecord] = 
HoodieFileGroupReader.newBuilder()
             .withReaderContext(readerContext)
             .withHoodieTableMetaClient(metaClient)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 3f61827f13f3..94f17bdc5f1d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -270,6 +270,17 @@ class ExpressionPayload(@transient record: GenericRecord,
     isDeletedRecord || isDeleteOnCondition
   }
 
+  override def getIndexedRecord(schema: Schema, properties: Properties): 
HOption[IndexedRecord] = {
+    val recordSchema = getRecordSchema(properties)
+    val indexedRecord = bytesToAvro(recordBytes, recordSchema)
+
+    if (super.isDeleteRecord(indexedRecord)) {
+      HOption.empty[IndexedRecord]()
+    } else {
+      HOption.of(indexedRecord)
+    }
+  }
+
   override def getInsertValue(schema: Schema, properties: Properties): 
HOption[IndexedRecord] = {
     val recordSchema = getRecordSchema(properties)
     val incomingRecord = ConvertibleRecord(bytesToAvro(recordBytes, 
recordSchema))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index 271f8682d297..0cf6fc1c58e3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -815,7 +815,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
     }
 
     @Override
-    public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+    public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord, String 
partitionPath) {
       HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
       if (bufferedRecord.isDelete()) {
         return new HoodieEmptyRecord<>(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
index 8941c05f4425..c0fc5846f4b2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
@@ -50,6 +50,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.RawTripTestPayloadKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -159,6 +161,8 @@ public class TestHoodieIndex extends TestHoodieMetadataBase 
{
         
.withLayoutConfig(HoodieLayoutConfig.newBuilder().fromProperties(indexBuilder.build().getProps())
             
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
         .build();
+    KeyGeneratorType keyGeneratorType = 
HoodieSparkKeyGeneratorFactory.inferKeyGeneratorTypeFromWriteConfig(config.getProps());
+    config.setValue(HoodieWriteConfig.KEYGENERATOR_TYPE, 
keyGeneratorType.name());
     writeClient = getHoodieWriteClient(config);
     this.index = writeClient.getIndex();
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
index 973055e35a0e..9ca649dab4cd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
@@ -335,8 +335,9 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(
         s"""
            |merge into $tableName t0
-           |using ( select 1 as id, 'a1' as name, 12.0 as price, 999 as ts
-           |union select 3 as id, 'a3' as name, 25.0 as price, 1260 as ts) s0
+           |using ( select 1 as id, 1.0 as price, 100 as ts
+           |union select 1 as id, 12.0 as price, 999 as ts
+           |union select 3 as id, 25.0 as price, 1260 as ts) s0
            |on t0.id = s0.id
            |when matched then update set price = s0.price, _ts = s0.ts
            |""".stripMargin)

Reply via email to