This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c574654aac0 [HUDI-6368] Strength avro record merger (#8953)
c574654aac0 is described below

commit c574654aac0f10f5e14673c5112dbd21d1aa2bd5
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jun 14 20:28:24 2023 +0800

    [HUDI-6368] Strength avro record merger (#8953)
    
    * move the merging operation decision (preCombining or base/delta merge) to 
the invoker, it is deterministic when initializing the record merger
    * expose the operation mode as an interface so that some of the merger like 
HoodieSparkValidateDuplicateKeyRecordMerger can have custom logic
    * pre register the object cache of the record merger to mediate the 
complexity
    * fix the equility check of payload within FlinkWriteHelper
    * add default merger type for merged/unmerged log scanner
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    | 23 ++++----
 .../cli/commands/TestHoodieLogFileCommand.java     |  5 +-
 .../org/apache/hudi/index/HoodieIndexUtils.java    | 13 ++++-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  2 +-
 .../hudi/table/action/commit/BaseWriteHelper.java  | 13 +----
 .../table/action/commit/HoodieWriteHelper.java     |  2 +-
 .../hudi/table/action/commit/FlinkWriteHelper.java |  4 +-
 .../hudi/table/action/commit/JavaWriteHelper.java  |  2 +-
 .../bootstrap/ParquetBootstrapMetadataHandler.java |  7 +--
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 12 ++--
 .../TestHoodieClientOnMergeOnReadStorage.java      |  4 --
 .../hudi/common/model/HoodieAvroRecordMerger.java  | 68 +++++-----------------
 .../model/HoodiePreCombineAvroRecordMerger.java    | 56 ++++++++++++++++++
 .../hudi/common/model/OperationModeAwareness.java  | 34 +++++++++++
 .../table/log/AbstractHoodieLogRecordReader.java   |  4 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |  6 +-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |  6 +-
 .../apache/hudi/common/util/HoodieRecordUtils.java | 20 ++++---
 .../hudi/io/storage/HoodieAvroParquetReader.java   |  2 -
 .../metadata/HoodieMetadataLogRecordReader.java    |  4 +-
 .../common/functional/TestHoodieLogFormat.java     | 20 ++-----
 ...rdUtilsTest.java => TestHoodieRecordUtils.java} |  2 +-
 .../examples/quickstart/TestQuickstartData.java    |  3 -
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  3 +-
 .../apache/hudi/table/format/TestInputFormat.java  | 38 ++++++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  | 14 ++++-
 .../realtime/RealtimeCompactedRecordReader.java    |  9 +--
 .../realtime/RealtimeUnmergedRecordReader.java     |  6 +-
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |  4 +-
 ...odieSparkValidateDuplicateKeyRecordMerger.scala | 23 +++-----
 .../ShowHoodieLogFileRecordsProcedure.scala        |  5 +-
 .../spark/sql/hudi/HoodieSparkSqlTestBase.scala    |  3 +-
 .../spark/sql/hudi/TestHoodieOptionConfig.scala    |  3 +-
 33 files changed, 237 insertions(+), 183 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 187e24539bb..82f9c1a6468 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -18,12 +18,6 @@
 
 package org.apache.hudi.cli.commands;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.HoodieTableHeaderFields;
@@ -31,7 +25,6 @@ import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -45,17 +38,21 @@ import 
org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.springframework.shell.standard.ShellComponent;
 import org.springframework.shell.standard.ShellMethod;
 import org.springframework.shell.standard.ShellOption;
-import scala.Tuple2;
-import scala.Tuple3;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -68,6 +65,9 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+import scala.Tuple3;
+
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
 /**
@@ -221,7 +221,6 @@ public class HoodieLogFileCommand {
               
.withSpillableMapBasePath(HoodieMemoryConfig.getDefaultSpillableMapBasePath())
               
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
               
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-              
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
               
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
               .build();
       for (HoodieRecord hoodieRecord : scanner) {
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index d964a127683..25298876c42 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -28,9 +28,8 @@ import 
org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
 import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -40,7 +39,6 @@ import 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
@@ -232,7 +230,6 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
         
.withSpillableMapBasePath(HoodieMemoryConfig.getDefaultSpillableMapBasePath())
         
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
         .build();
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 89cbf8d9a1e..46ad232022d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -230,7 +231,12 @@ public class HoodieIndexUtils {
   /**
    * Merge the incoming record with the matching existing record loaded via 
{@link HoodieMergedReadHandle}. The existing record is the latest version in 
the table.
    */
-  private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(HoodieRecord<R> incoming, HoodieRecord<R> 
existing, Schema writeSchema, HoodieWriteConfig config) throws IOException {
+  private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(
+      HoodieRecord<R> incoming,
+      HoodieRecord<R> existing,
+      Schema writeSchema,
+      HoodieWriteConfig config,
+      HoodieRecordMerger recordMerger) throws IOException {
     Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
     // prepend the hoodie meta fields as the incoming record does not have them
@@ -239,7 +245,7 @@ public class HoodieIndexUtils {
     // after prepend the meta fields, convert the record back to the original 
payload
     HoodieRecord incomingWithMetaFields = incomingPrepended
         .wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), 
Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, 
Option.empty());
-    Option<Pair<HoodieRecord, Schema>> mergeResult = config.getRecordMerger()
+    Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
         .merge(existing, existingSchema, incomingWithMetaFields, 
writeSchemaWithMetaFields, config.getProps());
     if (mergeResult.isPresent()) {
       // the merged record needs to be converted back to the original payload
@@ -270,6 +276,7 @@ public class HoodieIndexUtils {
     // merged existing records with current locations being set
     HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(partitionLocations, config, hoodieTable);
 
+    final HoodieRecordMerger recordMerger = config.getRecordMerger();
     HoodieData<HoodieRecord<R>> taggedUpdatingRecords = 
updatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
         .leftOuterJoin(existingRecords.mapToPair(r -> 
Pair.of(r.getRecordKey(), r)))
         .values().flatMap(entry -> {
@@ -286,7 +293,7 @@ public class HoodieIndexUtils {
             return 
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
 Option.of(existing.getCurrentLocation()))).iterator();
           }
 
-          Option<HoodieRecord<R>> mergedOpt = 
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config);
+          Option<HoodieRecord<R>> mergedOpt = 
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config, 
recordMerger);
           if (!mergedOpt.isPresent()) {
             // merge resulted in delete: force tag the incoming to the old 
partition
             return 
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
 Option.of(existing.getCurrentLocation()))).iterator();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 416e3ebfe5e..cfe11b1fd8d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -456,7 +456,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     }
 
     long oldNumWrites = 0;
-    try (HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
 oldFilePath)) {
+    try (HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
 oldFilePath)) {
       oldNumWrites = reader.getTotalRecords();
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to check for merge data 
validation", e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 6e1c1f8d9ff..8d8978927f6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -21,9 +21,9 @@ package org.apache.hudi.table.action.commit;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.function.SerializableFunctionUnchecked;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
@@ -32,7 +32,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Properties;
 
 public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I> {
 
@@ -92,15 +91,9 @@ public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
-    HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
+    HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
     return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
   }
 
-  public I deduplicateRecords(I records, HoodieIndex<?, ?> index, int 
parallelism, String schema, Properties props, HoodieRecordMerger merger) {
-    TypedProperties updatedProps = 
HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(props);
-    return doDeduplicateRecords(records, index, parallelism, schema, 
updatedProps, merger);
-  }
-
-  protected abstract I doDeduplicateRecords(
-      I records, HoodieIndex<?, ?> index, int parallelism, String schema, 
TypedProperties props, HoodieRecordMerger merger);
+  public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int 
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 47793d7155e..d7640c28e50 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -56,7 +56,7 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
   }
 
   @Override
-  protected HoodieData<HoodieRecord<T>> doDeduplicateRecords(
+  public HoodieData<HoodieRecord<T>> deduplicateRecords(
       HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger 
merger) {
     boolean isIndexingGlobal = index.isGlobal();
     final SerializableSchema schema = new SerializableSchema(schemaStr);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 6cdb17715de..b6f8541f50e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -91,7 +91,7 @@ public class FlinkWriteHelper<T, R> extends 
BaseWriteHelper<T, List<HoodieRecord
   }
 
   @Override
-  protected List<HoodieRecord<T>> doDeduplicateRecords(
+  public List<HoodieRecord<T>> deduplicateRecords(
       List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, 
String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
     // If index used is global, then records are expected to differ in their 
partitionPath
     Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
@@ -110,7 +110,7 @@ public class FlinkWriteHelper<T, R> extends 
BaseWriteHelper<T, List<HoodieRecord
       // we cannot allow the user to change the key or partitionPath, since 
that will affect
       // everything
       // so pick it from one of the records.
-      boolean choosePrev = rec1 == reducedRecord;
+      boolean choosePrev = rec1.getData() == reducedRecord.getData();
       HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
       HoodieOperation operation = choosePrev ? rec1.getOperation() : 
rec2.getOperation();
       HoodieRecord<T> hoodieRecord = reducedRecord.newInstance(reducedKey, 
operation);
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 04befe4ea16..beb1d14f329 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -58,7 +58,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, 
List<HoodieRecord<T
   }
 
   @Override
-  protected List<HoodieRecord<T>> doDeduplicateRecords(
+  public List<HoodieRecord<T>> deduplicateRecords(
       List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, 
String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
     boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = 
records.stream().map(record -> {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 8ef6ab8f5cf..2c3ddfdcda2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -23,7 +23,6 @@ import 
org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -78,16 +77,16 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
                                   KeyGeneratorInterface keyGenerator,
                                   String partitionPath,
                                   Schema schema) throws Exception {
-    HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
+    HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
 
-    HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType())
+    HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
             .getFileReader(table.getHadoopConf(), sourceFilePath);
 
     HoodieExecutor<Void> executor = null;
     try {
       Function<HoodieRecord, HoodieRecord> transformer = record -> {
         String recordKey = record.getRecordKey(schema, 
Option.of(keyGenerator));
-        return createNewMetadataBootstrapRecord(recordKey, partitionPath, 
recordMerger.getRecordType())
+        return createNewMetadataBootstrapRecord(recordKey, partitionPath, 
recordType)
             // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
             //       payload pointing into a shared, mutable (underlying) 
buffer we get a clean copy of
             //       it since these records will be inserted into the queue 
later.
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index f42b799bff5..8e26fcab77a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -46,15 +46,14 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -80,7 +79,6 @@ import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.MarkerUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -487,11 +485,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
-    HoodieRecordMerger recordMerger = 
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
     int dedupParallelism = records.getNumPartitions() + 100;
     HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
         (HoodieData<HoodieRecord<RawTripTestPayload>>) 
HoodieWriteHelper.newInstance()
-            .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+            .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), 
HoodiePreCombineAvroRecordMerger.INSTANCE);
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = 
dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), 
dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
@@ -503,7 +500,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     when(index.isGlobal()).thenReturn(false);
     dedupedRecsRdd =
         (HoodieData<HoodieRecord<RawTripTestPayload>>) 
HoodieWriteHelper.newInstance()
-            .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+            .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), 
HoodiePreCombineAvroRecordMerger.INSTANCE);
     dedupedRecs = dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), 
dedupedRecsRdd.getNumPartitions());
     assertEquals(2, dedupedRecs.size());
@@ -555,11 +552,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
-    HoodieRecordMerger recordMerger = 
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
     int dedupParallelism = records.getNumPartitions() + 100;
     HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
         (HoodieData<HoodieRecord<RawTripTestPayload>>) 
HoodieWriteHelper.newInstance()
-            .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+            .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), 
HoodiePreCombineAvroRecordMerger.INSTANCE);
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = 
dedupedRecsRdd.collectAsList();
     assertEquals(dedupedRecs.get(0).getOperation(), 
recordThree.getOperation());
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 4073e63bd12..90a529e33a2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -21,7 +21,6 @@ package org.apache.hudi.client.functional;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -35,7 +34,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -448,7 +446,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
             .withLatestInstantTime(instant)
             .withBufferSize(config.getMaxDFSStreamBufferSize())
             .withOptimizedLogBlocksScan(true)
-            
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
             .build();
         scanner.scan(true);
         List<String> prevInstants = scanner.getValidBlockInstants();
@@ -462,7 +459,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
             .withLatestInstantTime(currentInstant)
             .withBufferSize(config.getMaxDFSStreamBufferSize())
             .withOptimizedLogBlocksScan(true)
-            
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
             .build();
         scanner2.scan(true);
         List<String> currentInstants = scanner2.getValidBlockInstants();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index 252e11135af..a7bba4ebd9c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -21,19 +21,21 @@ package org.apache.hudi.common.model;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
-import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.IOException;
 import java.util.Properties;
 
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-public class HoodieAvroRecordMerger implements HoodieRecordMerger {
+/**
+ * Record merger for Hoodie avro record.
+ *
+ * <p>It should only be used for base record from disk to merge with incoming 
record.
+ */
+public class HoodieAvroRecordMerger implements HoodieRecordMerger, 
OperationModeAwareness {
+  public static final HoodieAvroRecordMerger INSTANCE = new 
HoodieAvroRecordMerger();
 
   @Override
   public String getMergingStrategy() {
@@ -42,27 +44,8 @@ public class HoodieAvroRecordMerger implements 
HoodieRecordMerger {
 
   @Override
   public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
-    ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.AVRO);
-    ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.AVRO);
-    Config.LegacyOperationMode legacyOperatingMode = 
Config.LegacyOperationMode.valueOf(
-            props.getString(Config.LEGACY_OPERATING_MODE.key(), 
Config.LEGACY_OPERATING_MODE.defaultValue()));
-
-    switch (legacyOperatingMode) {
-      case PRE_COMBINING:
-        HoodieRecord res = preCombine(older, newer, newSchema, props);
-        if (res == older) {
-          return Option.of(Pair.of(res, oldSchema));
-        } else {
-          return Option.of(Pair.of(res, newSchema));
-        }
-
-      case COMBINING:
-        return combineAndGetUpdateValue(older, newer, newSchema, props)
-            .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) 
r).getData()).getSchema()));
-
-      default:
-        throw new UnsupportedOperationException(String.format("Unsupported 
legacy operating mode (%s)", legacyOperatingMode));
-    }
+    return combineAndGetUpdateValue(older, newer, newSchema, props)
+        .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
   }
 
   @Override
@@ -70,40 +53,17 @@ public class HoodieAvroRecordMerger implements 
HoodieRecordMerger {
     return HoodieRecordType.AVRO;
   }
 
-  private HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer, 
Schema schema, Properties props) {
-    HoodieRecordPayload payload = unsafeCast(((HoodieAvroRecord) 
newer).getData().preCombine(((HoodieAvroRecord) older).getData(), schema, 
props));
-    return new HoodieAvroRecord(newer.getKey(), payload, newer.getOperation());
-  }
-
-  private Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException {
+  private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException {
     Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, 
props).map(HoodieAvroIndexedRecord::getData);
     if (!previousAvroData.isPresent()) {
       return Option.empty();
     }
 
-    return ((HoodieAvroRecord) 
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, props)
-        .map(combinedAvroPayload -> new 
HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
+    return ((HoodieAvroRecord) 
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, 
props);
   }
 
-  public static class Config {
-
-    public enum LegacyOperationMode {
-      PRE_COMBINING,
-      COMBINING
-    }
-
-    public static ConfigProperty<String> LEGACY_OPERATING_MODE =
-        
ConfigProperty.key("hoodie.datasource.write.record.merger.legacy.operation")
-            .defaultValue(LegacyOperationMode.COMBINING.name())
-            .withDocumentation("Controls the mode of the merging operation 
performed by `HoodieAvroRecordMerger`. "
-                + "This is required to maintain backward-compatibility w/ the 
existing semantic of `HoodieRecordPayload` "
-                + "implementations providing `preCombine` and 
`combineAndGetUpdateValue` methods.");
-
-    public static TypedProperties 
withLegacyOperatingModePreCombining(Properties props) {
-      TypedProperties newProps = new TypedProperties();
-      newProps.putAll(props);
-      newProps.setProperty(Config.LEGACY_OPERATING_MODE.key(), 
Config.LegacyOperationMode.PRE_COMBINING.name());
-      return newProps;
-    }
+  @Override
+  public HoodieRecordMerger asPreCombiningMode() {
+    return HoodiePreCombineAvroRecordMerger.INSTANCE;
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreCombineAvroRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreCombineAvroRecordMerger.java
new file mode 100644
index 00000000000..904f6847441
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreCombineAvroRecordMerger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+/**
+ * Record merger for Hoodie avro record.
+ *
+ * <p>It should only be used for deduplication among incoming records.
+ */
+public class HoodiePreCombineAvroRecordMerger extends HoodieAvroRecordMerger {
+  public static final HoodiePreCombineAvroRecordMerger INSTANCE = new 
HoodiePreCombineAvroRecordMerger();
+
+  @Override
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+    return Option.of(preCombine(older, oldSchema, newer, newSchema, props));
+  }
+
+  @SuppressWarnings("rawtypes, unchecked")
+  private Pair<HoodieRecord, Schema> preCombine(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) {
+    HoodieRecordPayload newerPayload = ((HoodieAvroRecord) newer).getData();
+    HoodieRecordPayload olderPayload = ((HoodieAvroRecord) older).getData();
+    HoodieRecordPayload payload = newerPayload.preCombine(olderPayload, 
newSchema, props);
+    if (payload == olderPayload) {
+      return Pair.of(older, oldSchema);
+    } else if (payload == newerPayload) {
+      return Pair.of(newer, newSchema);
+    } else {
+      HoodieRecord mergedRecord = new HoodieAvroRecord(newer.getKey(), 
payload, newer.getOperation());
+      return Pair.of(mergedRecord, newSchema);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OperationModeAwareness.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OperationModeAwareness.java
new file mode 100644
index 00000000000..95d12dfc149
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OperationModeAwareness.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+/**
+ * In some cases, the Hudi engine needs to know what operation mode the 
current merging belongs to.
+ * {@link HoodieRecordMerger} that wants to distinguish the operation mode 
should implement this interface.
+ */
+public interface OperationModeAwareness {
+  /**
+   * Specifies the legacy operation mode as preCombining.
+   *
+   * <p>The preCombining takes place in two cases:
+   * i). In memory records merging during data ingestion;
+   * ii). Log records merging for MOR reader.
+   */
+  HoodieRecordMerger asPreCombiningMode();
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 7304cc3d1fb..1e5bef103f4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -60,7 +59,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
@@ -167,7 +165,7 @@ public abstract class AbstractHoodieLogRecordReader {
     this.payloadClassFQN = tableConfig.getPayloadClass();
     this.preCombineField = tableConfig.getPreCombineField();
     // Log scanner merge log with precombine
-    TypedProperties props = 
HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(new 
Properties());
+    TypedProperties props = new TypedProperties();
     if (this.preCombineField != null) {
       props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
this.preCombineField);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index ef4eec1ba07..c150ba9a3e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -333,7 +335,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     // By default, we're doing a full-scan
     private boolean forceFullScan = true;
     private boolean enableOptimizedLogBlocksScan = false;
-    private HoodieRecordMerger recordMerger;
+    private HoodieRecordMerger recordMerger = 
HoodiePreCombineAvroRecordMerger.INSTANCE;
 
     @Override
     public Builder withFileSystem(FileSystem fs) {
@@ -436,7 +438,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
 
     @Override
     public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
-      this.recordMerger = recordMerger;
+      this.recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
       return this;
     }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 726172e5ee0..51044c0814c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -19,9 +19,11 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.internal.schema.InternalSchema;
@@ -106,7 +108,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
     // specific configurations
     private LogRecordScannerCallback callback;
     private boolean enableOptimizedLogBlocksScan;
-    private HoodieRecordMerger recordMerger;
+    private HoodieRecordMerger recordMerger = 
HoodiePreCombineAvroRecordMerger.INSTANCE;
 
     public Builder withFileSystem(FileSystem fs) {
       this.fs = fs;
@@ -174,7 +176,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
 
     @Override
     public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
-      this.recordMerger = recordMerger;
+      this.recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
       return this;
     }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index f7790837d9c..4bd7ffccbab 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.OperationModeAwareness;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 
@@ -39,9 +40,12 @@ import java.util.Objects;
  * A utility class for HoodieRecord.
  */
 public class HoodieRecordUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordUtils.class);
 
   private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordUtils.class);
+  static {
+    INSTANCE_CACHE.put(HoodieAvroRecordMerger.class.getName(), 
HoodieAvroRecordMerger.INSTANCE);
+  }
 
   /**
    * Instantiate a given class with a record merge.
@@ -71,7 +75,7 @@ public class HoodieRecordUtils {
   public static HoodieRecordMerger createRecordMerger(String basePath, 
EngineType engineType,
       List<String> mergerClassList, String recordMergerStrategy) {
     if (mergerClassList.isEmpty() || 
HoodieTableMetadata.isMetadataTable(basePath)) {
-      return 
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
+      return HoodieAvroRecordMerger.INSTANCE;
     } else {
       return mergerClassList.stream()
           .map(clazz -> {
@@ -86,7 +90,7 @@ public class HoodieRecordUtils {
           .filter(merger -> 
merger.getMergingStrategy().equals(recordMergerStrategy))
           .filter(merger -> recordTypeCompatibleEngine(merger.getRecordType(), 
engineType))
           .findFirst()
-          
.orElse(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
+          .orElse(HoodieAvroRecordMerger.INSTANCE);
     }
   }
 
@@ -105,10 +109,10 @@ public class HoodieRecordUtils {
   }
 
   public static boolean recordTypeCompatibleEngine(HoodieRecordType 
recordType, EngineType engineType) {
-    if (engineType == EngineType.SPARK && recordType == 
HoodieRecordType.SPARK) {
-      return true;
-    } else {
-      return false;
-    }
+    return engineType == EngineType.SPARK && recordType == 
HoodieRecordType.SPARK;
+  }
+
+  public static HoodieRecordMerger mergerToPreCombineMode(HoodieRecordMerger 
merger) {
+    return merger instanceof OperationModeAwareness ? 
((OperationModeAwareness) merger).asPreCombiningMode() : merger;
   }
 }
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index d9eb0c54a84..0b97d052cbb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -47,8 +47,6 @@ import static 
org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 /**
  * {@link HoodieFileReader} implementation for parquet format.
- *
- * @param <R> Record implementation that permits field access by integer index.
  */
 public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index befb518faca..18c389a1d1b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.hudi.metadata;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
@@ -136,10 +135,9 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
     private final HoodieMergedLogRecordScanner.Builder scannerBuilder =
         new HoodieMergedLogRecordScanner.Builder()
             .withKeyFiledOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
-            // NOTE: Merging of Metadata Table's records is currently handled 
using {@code HoodieAvroRecordMerger}
+            // NOTE: Merging of Metadata Table's records is currently handled 
using {@code HoodiePreCombineAvroRecordMerger}
             //       for compatibility purposes; In the future it {@code 
HoodieMetadataPayload} semantic
             //       will be migrated to its own custom instance of {@code 
RecordMerger}
-            .withRecordMerger(new HoodieAvroRecordMerger())
             .withReadBlocksLazily(true)
             .withReverseReader(false)
             .withOperationField(false);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index af4e5d72d85..5c946ec9c5a 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -55,7 +54,6 @@ import 
org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -663,7 +661,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
         .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
 
     List<IndexedRecord> scannedRecords = new ArrayList<>();
@@ -710,7 +707,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
         .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .withForceFullScan(false)
         .build();
 
@@ -801,7 +797,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
         .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .withForceFullScan(false)
         .build();
 
@@ -1300,7 +1295,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
         .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
 
     assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 
records");
@@ -1348,7 +1342,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .build();
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -1377,9 +1370,9 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                           boolean 
isCompressionEnabled,
-                                                           boolean 
readBlocksLazily,
-                                                           boolean 
enableOptimizedLogBlocksScan)
+                                                                      boolean 
isCompressionEnabled,
+                                                                      boolean 
readBlocksLazily,
+                                                                      boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1458,7 +1451,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
         .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
     final List<Boolean> newEmptyPayloads = new ArrayList<>();
@@ -1573,7 +1565,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
 
     assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 
records");
@@ -2130,7 +2121,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
         .withOptimizedLogBlocksScan(true)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
     assertEquals(600, scanner.getTotalLogRecords(), "We would read 600 records 
from scanner");
     final List<String> readKeys = new ArrayList<>();
@@ -2220,7 +2210,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
           .withDiskMapType(diskMapType)
           .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
           .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-          
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
           .build();
 
       assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), 
scanner.getNumMergedRecordsInLog(),
@@ -2734,8 +2723,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
+        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan);
     try (HoodieMergedLogRecordScanner scanner = builder.build()) {
       assertEquals(expectedTotalRecords, scanner.getTotalLogRecords(), "There 
should be " + expectedTotalRecords + " records");
       final Set<String> readKeys = new HashSet<>();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
similarity index 98%
rename from 
hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
rename to 
hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
index 4dcabecccb7..7cc1f2b0b8f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-class HoodieRecordUtilsTest {
+class TestHoodieRecordUtils {
 
   @Test
   void loadHoodieMerge() {
diff --git 
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
 
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
index ab929907186..7fc93c776f5 100644
--- 
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
+++ 
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -38,9 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations;
 import org.apache.parquet.Strings;
 import org.apache.parquet.avro.AvroParquetReader;
@@ -362,7 +360,6 @@ public class TestQuickstartData {
         .withSpillableMapBasePath("/tmp/")
         
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 7dc6ea9c0f4..5087c814030 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -201,7 +202,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
   }
 
   private void initMergeClass() {
-    recordMerger = writeClient.getConfig().getRecordMerger();
+    recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
     LOG.info("init hoodie merge with class [{}]", 
recordMerger.getClass().getName());
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 7c076344166..f4ecb3e67d0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -56,7 +56,9 @@ import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
@@ -68,6 +70,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -962,6 +965,28 @@ public class TestInputFormat {
     assertThat(baseMergeLogFileResult2, is("[]"));
   }
 
+  /**
+   * This test check 2 cases of records preCombining.
+   * When the preCombine is true, the writer does an in-memory combing for 
incoming records,
+   * when the preCombine is false, the merged log reader does the combining 
while reading.
+   * With disorder deletes, we can check whether the '_hoodie_operation' is 
correctly set up.
+   */
+  @ParameterizedTest
+  @MethodSource("preCombiningAndChangelogModeParams")
+  void testMergeOnReadDisorderDeleteMerging(boolean preCombine, boolean 
changelogMode) throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PRE_COMBINE.key(), preCombine + "");
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), changelogMode + "");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write log file with disorder deletes
+    TestData.writeData(TestData.DATA_SET_DISORDER_INSERT_DELETE, conf);
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    final String baseResult = TestData.rowDataToString(readData(inputFormat));
+    String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
+    assertThat(baseResult, is(expected));
+  }
+
   @Test
   void testReadArchivedCommitsIncrementally() throws Exception {
     Map<String, String> options = new HashMap<>();
@@ -1085,6 +1110,19 @@ public class TestInputFormat {
   //  Utilities
   // -------------------------------------------------------------------------
 
+  /**
+   * Return test params => (preCombining, changelog mode).
+   */
+  private static Stream<Arguments> preCombiningAndChangelogModeParams() {
+    Object[][] data =
+        new Object[][] {
+            {true, true},
+            {true, false},
+            {false, true},
+            {false, false}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private HoodieTableSource getTableSource(Configuration conf) {
     return new HoodieTableSource(
         TestConfigurations.TABLE_SCHEMA,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7f9ad108941..8c8ecf71061 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -24,13 +24,11 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -345,6 +343,17 @@ public class TestData {
           TimestampData.fromEpochMillis(1), StringData.fromString("par1"))
   );
 
+  public static List<RowData> DATA_SET_DISORDER_INSERT_DELETE = Arrays.asList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
22,
+          TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1"))
+  );
+
   public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
       // DISORDER UPDATE
       updateAfterRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 21,
@@ -926,7 +935,6 @@ public class TestData {
         .withSpillableMapBasePath("/tmp/")
         
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 8701939ee25..2cf372384e9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -25,9 +25,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
@@ -62,7 +60,6 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
 
   private final Set<String> deltaRecordKeys;
   private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
-  private final HoodieRecordMerger recordMerger;
   private final int recordKeyIndex;
   private Iterator<String> deltaItr;
 
@@ -76,7 +73,6 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
     this.recordKeyIndex = split.getVirtualKeyInfo()
         .map(HoodieVirtualKeyInfo::getRecordKeyFieldIndex)
         .orElse(HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
-    this.recordMerger = 
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
   }
 
   /**
@@ -103,7 +99,6 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
             
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
         
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2,
 false))
         
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
         .build();
   }
 
@@ -191,10 +186,10 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
     // for hive engine, the hiveSchema will be: col1,col2,par, and the 
writerSchema will be col1,col2,par
     // for presto engine, the hiveSchema will be: col1,col2, but the 
writerSchema will be col1,col2,par
     // so to be compatible with hive and presto, we should rewrite oldRecord 
before we call combineAndGetUpdateValue,
-    // once presto on hudi have it's own mor reader, we can remove the rewrite 
logical.
+    // once presto on hudi have its own mor reader, we can remove the rewrite 
logical.
     GenericRecord genericRecord = 
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, 
getLogScannerReaderSchema());
     HoodieRecord record = new HoodieAvroIndexedRecord(genericRecord);
-    Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(record,
+    Option<Pair<HoodieRecord, Schema>> mergeResult = 
HoodieAvroRecordMerger.INSTANCE.merge(record,
         genericRecord.getSchema(), newRecord, getLogScannerReaderSchema(), new 
TypedProperties(payloadProps));
     return mergeResult.map(p -> (HoodieAvroIndexedRecord) p.getLeft());
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 427b758db34..a40519df92d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -19,11 +19,9 @@
 package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
@@ -85,9 +83,7 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader
           .withLatestInstantTime(split.getMaxCommitTime())
           
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
           .withReverseReader(false)
-          
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-          
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
-
+          
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
 
     this.executor = new BoundedInMemoryExecutor<>(
         
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), 
getParallelProducers(scannerBuilder),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 239ec31eb65..a541bbb8e52 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
-import org.apache.hudi.common.model.{HoodieAvroRecordMerger, 
HoodieRecordMerger}
+import org.apache.hudi.common.model.HoodieRecordMerger
 import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
+import org.apache.hudi.common.util.ValidationUtils
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.StructType
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
index f59b62d7bec..8127cb73414 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
@@ -20,9 +20,8 @@ package org.apache.spark.sql.hudi.command
 import org.apache.avro.Schema
 import org.apache.hudi.HoodieSparkRecordMerger
 import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.HoodieAvroRecordMerger.Config
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.{collection, Option => HOption}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordMerger, 
OperationModeAwareness}
+import org.apache.hudi.common.util.{HoodieRecordUtils, collection, Option => 
HOption}
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 
 /**
@@ -30,18 +29,14 @@ import org.apache.hudi.exception.HoodieDuplicateKeyException
  * config.
  * @see org.apache.spark.sql.hudi.command.ValidateDuplicateKeyPayload
  */
-class HoodieSparkValidateDuplicateKeyRecordMerger extends 
HoodieSparkRecordMerger {
+class HoodieSparkValidateDuplicateKeyRecordMerger extends 
HoodieSparkRecordMerger with OperationModeAwareness {
 
   override def merge(older: HoodieRecord[_], oldSchema: Schema, newer: 
HoodieRecord[_], newSchema: Schema, props: TypedProperties): 
HOption[collection.Pair[HoodieRecord[_], Schema]] = {
-    val legacyOperatingMode = 
Config.LegacyOperationMode.valueOf(props.getString(Config.LEGACY_OPERATING_MODE.key,
 Config.LEGACY_OPERATING_MODE.defaultValue))
-    legacyOperatingMode match {
-      case Config.LegacyOperationMode.PRE_COMBINING =>
-        super.merge(older, oldSchema, newer, newSchema, props)
-      case Config.LegacyOperationMode.COMBINING =>
-        val key = older.getRecordKey(oldSchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        throw new HoodieDuplicateKeyException(key)
-      case _ =>
-        throw new UnsupportedOperationException(String.format("Unsupported 
legacy operating mode (%s)", legacyOperatingMode))
-    }
+    val key = older.getRecordKey(oldSchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD)
+    throw new HoodieDuplicateKeyException(key)
+  }
+
+  override def asPreCombiningMode(): HoodieRecordMerger = {
+    
HoodieRecordUtils.loadRecordMerger(classOf[HoodieSparkRecordMerger].getName)
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 5761a75383a..36a9a882750 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -21,11 +21,11 @@ import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.config.HoodieCommonConfig
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieLogFile, 
HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecordPayload}
 import org.apache.hudi.common.table.log.block.HoodieDataBlock
 import org.apache.hudi.common.table.log.{HoodieLogFormat, 
HoodieMergedLogRecordScanner}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.{HoodieRecordUtils, ValidationUtils}
+import org.apache.hudi.common.util.ValidationUtils
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieMemoryConfig}
 import org.apache.parquet.avro.AvroSchemaConverter
 import org.apache.spark.sql.Row
@@ -76,7 +76,6 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
         
.withSpillableMapBasePath(HoodieMemoryConfig.getDefaultSpillableMapBasePath)
         
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue)
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
-        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(classOf[HoodieAvroRecordMerger].getName))
         .build
       scanner.asScala.foreach(hoodieRecord => {
         val record = 
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index 804a6c5f63b..c05a4055447 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.avro.model.HoodieCleanMetadata
-import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils}
+import org.apache.hudi.HoodieSparkRecordMerger
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 35ebe872e40..43fcb79ecf9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieAvroRecordMerger, HoodieRecordMerger, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.assertTrue

Reply via email to