This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 90fe9b2240a6 [HUDI-9682] Add support to FileGroupRecordBuffer to 
assist w/ cow merge handle migration (#13670)
90fe9b2240a6 is described below

commit 90fe9b2240a6a0aa94cfb14cfa98fe109155d24e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Aug 4 14:10:09 2025 -0700

    [HUDI-9682] Add support to FileGroupRecordBuffer to assist w/ cow merge 
handle migration (#13670)
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../common/table/read/HoodieFileGroupReader.java   |  15 +-
 .../apache/hudi/common/table/read/InputSplit.java  |  36 ++-
 .../buffer/DefaultFileGroupRecordBufferLoader.java |   5 +-
 .../read/buffer/FileGroupRecordBufferLoader.java   |   4 +
 ...a => StreamingFileGroupRecordBufferLoader.java} |  56 ++--
 .../read/buffer/BaseTestFileGroupRecordBuffer.java | 228 +++++++++++++++
 .../buffer/TestFileGroupRecordBufferLoader.java    | 118 ++++++++
 .../buffer/TestKeyBasedFileGroupRecordBuffer.java  | 309 ++++++++++++---------
 .../TestSortedKeyBasedFileGroupRecordBuffer.java   |  92 +++++-
 ...TestStreamingKeyBasedFileGroupRecordBuffer.java | 221 +++++++++++++++
 10 files changed, 891 insertions(+), 193 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 86a91a340db3..2a57d8b87376 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.PartitionPathParser;
 import org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
 import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -53,6 +54,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.function.UnaryOperator;
 import java.util.stream.Stream;
@@ -128,7 +130,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
    */
   private void initRecordIterators() throws IOException {
     ClosableIterator<T> iter = makeBaseFileIterator();
-    if (inputSplit.getLogFiles().isEmpty()) {
+    if (inputSplit.hasNoRecordsToMerge()) {
       this.baseFileIterator = new CloseableMappingIterator<>(iter, 
readerContext::seal);
     } else {
       this.baseFileIterator = iter;
@@ -361,6 +363,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     private String partitionPath;
     private long start = 0;
     private long length = Long.MAX_VALUE;
+    private Iterator<BufferedRecord> recordIterator;
     private boolean shouldUseRecordPosition = false;
     private boolean allowInflightInstants = false;
     private boolean emitDelete;
@@ -396,6 +399,12 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       return this;
     }
 
+    public Builder<T> withRecordIterator(Iterator<BufferedRecord> 
recordIterator) {
+      this.recordIterator = recordIterator;
+      this.recordBufferLoader = 
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+      return this;
+    }
+
     public Builder<T> withPartitionPath(String partitionPath) {
       this.partitionPath = partitionPath;
       return this;
@@ -491,7 +500,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       ValidationUtils.checkArgument(requestedSchema != null, "Requested schema 
is required");
       ValidationUtils.checkArgument(props != null, "Props is required");
       ValidationUtils.checkArgument(baseFileOption != null, "Base file option 
is required");
-      ValidationUtils.checkArgument(logFiles != null, "Log files stream is 
required");
       ValidationUtils.checkArgument(partitionPath != null, "Partition path is 
required");
       if (enableOptimizedLogBlockScan == null) {
         // check to see if props contains this key if not explicitly set
@@ -510,7 +518,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
           .allowInflightInstants(allowInflightInstants)
           .enableOptimizedLogBlockScan(enableOptimizedLogBlockScan)
           .build();
-      InputSplit inputSplit = new InputSplit(baseFileOption, logFiles, 
partitionPath, start, length);
+      InputSplit inputSplit = new InputSplit(baseFileOption, recordIterator != 
null ? Either.right(recordIterator) : Either.left(logFiles == null ? 
Stream.empty() : logFiles),
+          partitionPath, start, length);
       return new HoodieFileGroupReader<>(
           readerContext, storage, tablePath, latestCommitTime, dataSchema, 
requestedSchema, internalSchemaOpt, hoodieTableMetaClient,
           props, readerParameters, inputSplit, fileGroupUpdateCallback, 
recordBufferLoader);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
index 9ee6208354d8..39542d42b2e0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
@@ -19,13 +19,16 @@
 
 package org.apache.hudi.common.table.read;
 
-import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -37,32 +40,37 @@ import java.util.stream.Stream;
 public class InputSplit {
   private final Option<HoodieBaseFile> baseFileOption;
   private final List<HoodieLogFile> logFiles;
+  private final Option<Iterator<BufferedRecord>> recordIterator;
   private final String partitionPath;
   // Byte offset to start reading from the base file
   private final long start;
   // Length of bytes to read from the base file
   private final long length;
 
-  InputSplit(Option<HoodieBaseFile> baseFileOption, Stream<HoodieLogFile> 
logFiles, String partitionPath, long start, long length) {
+  InputSplit(Option<HoodieBaseFile> baseFileOption,
+             Either<Stream<HoodieLogFile>, Iterator<BufferedRecord>> 
recordsToMerge,
+             String partitionPath, long start, long length) {
     this.baseFileOption = baseFileOption;
-    this.logFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator())
-        .filter(logFile -> 
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-        .collect(Collectors.toList());
+    if (recordsToMerge.isLeft()) {
+      this.logFiles = 
recordsToMerge.asLeft().sorted(HoodieLogFile.getLogFileComparator())
+          .filter(logFile -> 
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+          .collect(Collectors.toList());
+      this.recordIterator = Option.empty();
+    } else {
+      this.logFiles = Collections.emptyList();
+      this.recordIterator = Option.of(recordsToMerge.asRight());
+    }
     this.partitionPath = partitionPath;
     this.start = start;
     this.length = length;
   }
 
-  static InputSplit fromFileSlice(FileSlice fileSlice, long start, long 
length) {
-    return new InputSplit(fileSlice.getBaseFile(), fileSlice.getLogFiles(), 
fileSlice.getPartitionPath(),
-        start, length);
-  }
-
   public Option<HoodieBaseFile> getBaseFileOption() {
     return baseFileOption;
   }
 
   public List<HoodieLogFile> getLogFiles() {
+    ValidationUtils.checkArgument(recordIterator.isEmpty(), "Log files are not 
initialized");
     return logFiles;
   }
 
@@ -81,4 +89,12 @@ public class InputSplit {
   public boolean isParquetBaseFile() {
     return baseFileOption.map(baseFile -> 
HoodieFileFormat.fromFileExtension(baseFile.getStoragePath().getFileExtension())
 == HoodieFileFormat.PARQUET).orElse(false);
   }
+
+  public boolean hasNoRecordsToMerge() {
+    return this.logFiles.isEmpty() && recordIterator.isEmpty();
+  }
+
+  public Iterator<BufferedRecord> getRecordIterator() {
+    return this.recordIterator.orElseThrow(() -> new 
IllegalStateException("The record iterator has not been setup"));
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index b30dd12d976b..66a5d751529b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -37,6 +37,7 @@ import java.util.List;
 
 /**
  * Default implementation of {@link FileGroupRecordBufferLoader} that 
initializes a buffer based on the reader parameters.
+ *
  * @param <T> the engine specific record type
  */
 class DefaultFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBufferLoader implements FileGroupRecordBufferLoader<T> {
@@ -59,7 +60,6 @@ class DefaultFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBufferLoade
                                                                             
ReaderParameters readerParameters,
                                                                             
HoodieReadStats readStats,
                                                                             
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
-
     boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, 
HoodieReaderConfig.MERGE_TYPE, 
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
     PartialUpdateMode partialUpdateMode = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
     UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback);
@@ -78,6 +78,7 @@ class DefaultFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBufferLoade
       recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
     }
-    return Pair.of(recordBuffer, scanLogFiles(readerContext, storage, 
inputSplit, hoodieTableMetaClient, props, readerParameters, readStats, 
recordBuffer));
+    return Pair.of(recordBuffer, scanLogFiles(readerContext, storage, 
inputSplit, hoodieTableMetaClient, props,
+        readerParameters, readStats, recordBuffer));
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
index c2d4834d9eff..983329dd6288 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
@@ -52,6 +52,10 @@ public interface FileGroupRecordBufferLoader<T> {
     return DefaultFileGroupRecordBufferLoader.getInstance();
   }
 
+  static <T> FileGroupRecordBufferLoader<T> 
createStreamingRecordsBufferLoader() {
+    return StreamingFileGroupRecordBufferLoader.getInstance();
+  }
+
   static <T> ReusableFileGroupRecordBufferLoader<T> 
createReusable(HoodieReaderContext<T> readerContextWithoutFilters) {
     return new 
ReusableFileGroupRecordBufferLoader<>(readerContextWithoutFilters);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
similarity index 54%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
index b30dd12d976b..8af9d3e3e5d5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
@@ -18,66 +18,64 @@
 
 package org.apache.hudi.common.table.read.buffer;
 
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.HoodieReadStats;
 import org.apache.hudi.common.table.read.InputSplit;
 import org.apache.hudi.common.table.read.ReaderParameters;
 import org.apache.hudi.common.table.read.UpdateProcessor;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.HoodieStorage;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 /**
- * Default implementation of {@link FileGroupRecordBufferLoader} that 
initializes a buffer based on the reader parameters.
- * @param <T> the engine specific record type
+ * Records based {@link FileGroupRecordBuffer} which takes in a map of records 
to be merged with a base file of interest.
+ * This will be used in write paths for COW merge cases.
+ * @param <T> Engine native presentation of the record.
  */
-class DefaultFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBufferLoader implements FileGroupRecordBufferLoader<T> {
-  private static final DefaultFileGroupRecordBufferLoader INSTANCE = new 
DefaultFileGroupRecordBufferLoader<>();
+public class StreamingFileGroupRecordBufferLoader<T> implements 
FileGroupRecordBufferLoader<T> {
+  private static final StreamingFileGroupRecordBufferLoader INSTANCE = new 
StreamingFileGroupRecordBufferLoader<>();
 
-  static <T> DefaultFileGroupRecordBufferLoader<T> getInstance() {
+  static <T> StreamingFileGroupRecordBufferLoader<T> getInstance() {
     return INSTANCE;
   }
 
-  private DefaultFileGroupRecordBufferLoader() {
-  }
-
   @Override
-  public Pair<HoodieFileGroupRecordBuffer<T>, List<String>> 
getRecordBuffer(HoodieReaderContext<T> readerContext,
-                                                                            
HoodieStorage storage,
-                                                                            
InputSplit inputSplit,
-                                                                            
List<String> orderingFieldNames,
-                                                                            
HoodieTableMetaClient hoodieTableMetaClient,
-                                                                            
TypedProperties props,
-                                                                            
ReaderParameters readerParameters,
-                                                                            
HoodieReadStats readStats,
+  public Pair<HoodieFileGroupRecordBuffer<T>, List<String>> 
getRecordBuffer(HoodieReaderContext<T> readerContext, HoodieStorage storage, 
InputSplit inputSplit,
+                                                                            
List<String> orderingFieldNames, HoodieTableMetaClient hoodieTableMetaClient,
+                                                                            
TypedProperties props, ReaderParameters readerParameters, HoodieReadStats 
readStats,
                                                                             
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
-
-    boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, 
HoodieReaderConfig.MERGE_TYPE, 
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
     PartialUpdateMode partialUpdateMode = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
     UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback);
     FileGroupRecordBuffer<T> recordBuffer;
-    if (isSkipMerge) {
-      recordBuffer = new UnmergedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, readStats);
-    } else if (readerParameters.sortOutputs()) {
+    if (readerParameters.sortOutputs()) {
       recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
-    } else if (readerParameters.useRecordPosition() && 
inputSplit.getBaseFileOption().isPresent()) {
-      recordBuffer = new PositionBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, inputSplit.getBaseFileOption().get().getCommitTime(), props,
-          orderingFieldNames, updateProcessor);
     } else {
       recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
     }
-    return Pair.of(recordBuffer, scanLogFiles(readerContext, storage, 
inputSplit, hoodieTableMetaClient, props, readerParameters, readStats, 
recordBuffer));
+
+    Iterator<BufferedRecord> recordIterator = inputSplit.getRecordIterator();
+
+    while (recordIterator.hasNext()) {
+      BufferedRecord bufferedRecord = recordIterator.next();
+      try {
+        recordBuffer.processNextDataRecord(bufferedRecord, 
bufferedRecord.getRecordKey());
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to process next buffered record", 
e);
+      }
+    }
+    return Pair.of(recordBuffer, Collections.emptyList());
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
new file mode 100644
index 000000000000..a7ba45b29c37
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.buffer;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.BaseAvroPayload;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BaseTestFileGroupRecordBuffer {
+
+  protected static final Schema SCHEMA = Schema.createRecord("test_record", 
null, "namespace", false,
+      Arrays.asList(
+          new Schema.Field("record_key", Schema.create(Schema.Type.STRING)),
+          new Schema.Field("counter", Schema.create(Schema.Type.INT)),
+          new Schema.Field("ts", Schema.create(Schema.Type.LONG))));
+
+  protected static GenericRecord createTestRecord(String recordKey, int 
counter, long ts) {
+    GenericRecord record = new GenericData.Record(SCHEMA);
+    record.put("record_key", recordKey);
+    record.put("counter", counter);
+    record.put("ts", ts);
+    return record;
+  }
+
+  protected static List<BufferedRecord> 
convertToBufferedRecordsList(List<IndexedRecord> indexedRecords,
+                                                                     
HoodieReaderContext<IndexedRecord> readerContext,
+                                                                     
TypedProperties props, String[] orderingFieldNames) {
+    return indexedRecords.stream().map(rec -> {
+      HoodieAvroIndexedRecord indexedRecord = new HoodieAvroIndexedRecord(new 
HoodieKey(rec.get(0).toString(), ""), rec, null);
+      return (BufferedRecord) 
BufferedRecord.forRecordWithContext(indexedRecord, 
readerContext.getSchemaHandler().getRequestedSchema(),
+          readerContext.getRecordContext(), props, orderingFieldNames);
+    }).collect(Collectors.toList());
+  }
+
+  protected static List<BufferedRecord> 
convertToBufferedRecordsListForDeletes(List<IndexedRecord> indexedRecords, 
boolean defaultOrderingValue) {
+    return indexedRecords.stream().map(rec -> {
+      return
+          (BufferedRecord) 
BufferedRecord.forDeleteRecord(DeleteRecord.create(new 
HoodieKey(rec.get(0).toString(), ""), defaultOrderingValue ? 0 : (Comparable) 
rec.get(2)),
+              defaultOrderingValue ? 0 : (Comparable) rec.get(2));
+    }).collect(Collectors.toList());
+  }
+
+  protected static KeyBasedFileGroupRecordBuffer<IndexedRecord> 
buildKeyBasedFileGroupRecordBuffer(HoodieReaderContext<IndexedRecord> 
readerContext,
+                                                                               
                  HoodieTableConfig tableConfig,
+                                                                               
                  HoodieReadStats readStats,
+                                                                               
                  HoodieRecordMerger recordMerger,
+                                                                               
                  RecordMergeMode recordMergeMode,
+                                                                               
                  List<String> orderingFieldNames,
+                                                                               
                  Option<Pair<String, String>> deleteMarkerKeyValue) {
+    TypedProperties props = new TypedProperties();
+    deleteMarkerKeyValue.ifPresent(markerKeyValue -> {
+      props.setProperty(DELETE_KEY, markerKeyValue.getLeft());
+      props.setProperty(DELETE_MARKER, markerKeyValue.getRight());
+    });
+    FileGroupReaderSchemaHandler<IndexedRecord> fileGroupReaderSchemaHandler = 
mock(FileGroupReaderSchemaHandler.class);
+    when(fileGroupReaderSchemaHandler.getRequiredSchema()).thenReturn(SCHEMA);
+    
when(fileGroupReaderSchemaHandler.getInternalSchema()).thenReturn(InternalSchema.getEmptyInternalSchema());
+    when(fileGroupReaderSchemaHandler.getDeleteContext()).thenReturn(new 
DeleteContext(props, SCHEMA));
+    readerContext.setSchemaHandler(fileGroupReaderSchemaHandler);
+    return buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, 
readStats, recordMerger, recordMergeMode, orderingFieldNames, props,
+        Option.empty());
+  }
+
+  protected static KeyBasedFileGroupRecordBuffer<IndexedRecord> 
buildKeyBasedFileGroupRecordBuffer(HoodieReaderContext<IndexedRecord> 
readerContext,
+                                                                               
                  HoodieTableConfig tableConfig,
+                                                                               
                  HoodieReadStats readStats,
+                                                                               
                  HoodieRecordMerger recordMerger,
+                                                                               
                  RecordMergeMode recordMergeMode,
+                                                                               
                  List<String> orderingFieldNames,
+                                                                               
                  TypedProperties props,
+                                                                               
                  Option<Iterator<BufferedRecord>> fileGroupRecordBufferItrOpt) 
{
+
+    readerContext.setRecordMerger(Option.ofNullable(recordMerger));
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
+    UpdateProcessor<IndexedRecord> updateProcessor = 
UpdateProcessor.create(readStats, readerContext, false, Option.empty());
+
+    if (fileGroupRecordBufferItrOpt.isEmpty()) {
+      return new KeyBasedFileGroupRecordBuffer<>(
+          readerContext, mockMetaClient, recordMergeMode, 
PartialUpdateMode.NONE, props, orderingFieldNames, updateProcessor);
+    } else {
+      FileGroupRecordBufferLoader recordBufferLoader = 
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+      InputSplit inputSplit = mock(InputSplit.class);
+      when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
+      
when(inputSplit.getRecordIterator()).thenReturn(fileGroupRecordBufferItrOpt.get());
+      ReaderParameters readerParameters = mock(ReaderParameters.class);
+      when(readerParameters.sortOutputs()).thenReturn(false);
+      return (KeyBasedFileGroupRecordBuffer<IndexedRecord>) 
recordBufferLoader.getRecordBuffer(readerContext, mockMetaClient.getStorage(), 
inputSplit,
+          orderingFieldNames, mockMetaClient, props, readerParameters, 
readStats, Option.empty()).getKey();
+    }
+  }
+
+  protected static List<IndexedRecord> 
getActualRecords(FileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer) 
throws IOException {
+    List<IndexedRecord> actualRecords = new ArrayList<>();
+    while (fileGroupRecordBuffer.hasNext()) {
+      actualRecords.add(fileGroupRecordBuffer.next());
+    }
+    return actualRecords;
+  }
+
+  /**
+   * A custom payload implementation for testing purposes that marks records 
as deleted once the counter exceeds 2.
+   * During the merge, it will combine the counter values.
+   */
+  public static class CustomPayload extends BaseAvroPayload
+      implements HoodieRecordPayload<CustomPayload> {
+    private final GenericRecord payloadRecord;
+
+    public CustomPayload(GenericRecord record, Comparable orderingVal) {
+      super(record, orderingVal);
+      this.payloadRecord = record;
+    }
+
+    @Override
+    public TestKeyBasedFileGroupRecordBuffer.CustomPayload 
preCombine(TestKeyBasedFileGroupRecordBuffer.CustomPayload oldValue) {
+      return this;
+    }
+
+    @Override
+    public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) throws IOException {
+      if (currentValue.get(2).equals(payloadRecord.get(2))) {
+        // If the timestamps are the same, we do not update
+        return Option.of(currentValue);
+      }
+      int result = (int) currentValue.get(1) + (int) payloadRecord.get(1);
+      if (result > 2) {
+        return Option.empty();
+      }
+      return Option.of(createTestRecord(currentValue.get(0).toString(), 
result, (long) payloadRecord.get(2)));
+    }
+
+    @Override
+    public Option<IndexedRecord> getInsertValue(Schema schema) throws 
IOException {
+      return Option.of(payloadRecord);
+    }
+
+    @Override
+    public Comparable<?> getOrderingValue() {
+      return null;
+    }
+  }
+
+  public static class CustomMerger implements HoodieRecordMerger {
+    private final String strategy = UUID.randomUUID().toString();
+
+    @Override
+    public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+      GenericRecord olderData = (GenericRecord) older.getData();
+      GenericRecord newerData = (GenericRecord) newer.getData();
+      if (olderData.get(2).equals(newerData.get(2))) {
+        // If the timestamps are the same, we do not update
+        return Option.of(Pair.of(older, oldSchema));
+      }
+      int result = (int) olderData.get(1) + (int) newerData.get(1);
+      if (result > 2) {
+        return Option.empty();
+      }
+      HoodieKey hoodieKey = older.getKey();
+      return Option.of(Pair.of(new 
HoodieAvroIndexedRecord(createTestRecord(hoodieKey.getRecordKey(), result, 
(long) newerData.get(2))), SCHEMA));
+    }
+
+    @Override
+    public HoodieRecord.HoodieRecordType getRecordType() {
+      return HoodieRecord.HoodieRecordType.AVRO;
+    }
+
+    @Override
+    public String getMergingStrategy() {
+      return strategy;
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
new file mode 100644
index 000000000000..3289614165af
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.buffer;
+
+import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFileGroupRecordBufferLoader extends 
BaseTestFileGroupRecordBuffer {
+
+  @ParameterizedTest
+  @CsvSource({"KeyBasedFileGroupRecordBuffer,true", 
"KeyBasedFileGroupRecordBuffer,false", 
"SortedKeyBasedFileGroupRecordBuffer,true",
+      "SortedKeyBasedFileGroupRecordBuffer,false", 
"PositionBasedFileGroupRecordBuffer,false"})
+  public void testDefaultFileGroupBufferRecordLoader(String 
fileGroupRecordBufferType, boolean testRecordsBased) {
+    FileGroupRecordBufferLoader fileGroupRecordBufferLoader = !testRecordsBased
+        ? FileGroupRecordBufferLoader.createDefault()
+        : FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+    HoodieReadStats readStats = new HoodieReadStats();
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
+    when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.NINE);
+    when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.empty());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.initRecordMerger(new TypedProperties());
+    FileGroupReaderSchemaHandler<IndexedRecord> fileGroupReaderSchemaHandler = 
mock(FileGroupReaderSchemaHandler.class);
+    when(fileGroupReaderSchemaHandler.getRequiredSchema()).thenReturn(SCHEMA);
+    
when(fileGroupReaderSchemaHandler.getInternalSchema()).thenReturn(InternalSchema.getEmptyInternalSchema());
+    DeleteContext deleteContext = mock(DeleteContext.class);
+    
when(deleteContext.getCustomDeleteMarkerKeyValue()).thenReturn(Option.empty());
+    when(deleteContext.getHoodieOperationPos()).thenReturn(-1);
+    
when(fileGroupReaderSchemaHandler.getDeleteContext()).thenReturn(deleteContext);
+    readerContext.setSchemaHandler(fileGroupReaderSchemaHandler);
+    readerContext.setRecordMerger(Option.ofNullable(null));
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
+    HoodieStorage storage = mock(HoodieStorage.class);
+    when(mockMetaClient.getStorage()).thenReturn(storage);
+    InputSplit inputSplit = mock(InputSplit.class);
+    if (testRecordsBased) {
+      
when(inputSplit.getRecordIterator()).thenReturn(Collections.emptyIterator());
+    }
+    ReaderParameters readerParameters = mock(ReaderParameters.class);
+    if (fileGroupRecordBufferType.contains("Sorted")) {
+      when(readerParameters.sortOutputs()).thenReturn(true);
+    }
+    if (fileGroupRecordBufferType.contains("Position")) {
+      HoodieBaseFile baseFile = mock(HoodieBaseFile.class);
+      when(inputSplit.getBaseFileOption()).thenReturn(Option.of(baseFile));
+      when(readerParameters.useRecordPosition()).thenReturn(true);
+    }
+
+    Option<BaseFileUpdateCallback> fileGroupUpdateCallback = Option.empty();
+
+    HoodieFileGroupRecordBuffer fileGroupRecordBuffer = 
(HoodieFileGroupRecordBuffer) fileGroupRecordBufferLoader
+        .getRecordBuffer(readerContext, storage, inputSplit, 
Collections.singletonList("ts"),
+            mockMetaClient, new TypedProperties(), readerParameters, 
readStats, fileGroupUpdateCallback).getLeft();
+
+    switch (fileGroupRecordBufferType) {
+      case "KeyBasedFileGroupRecordBuffer":
+        assertTrue(fileGroupRecordBuffer instanceof 
KeyBasedFileGroupRecordBuffer);
+        break;
+      case "SortedKeyBasedFileGroupRecordBuffer":
+        assertTrue(fileGroupRecordBuffer instanceof 
SortedKeyBasedFileGroupRecordBuffer);
+        break;
+      case "PositionBasedFileGroupRecordBuffer":
+        assertTrue(fileGroupRecordBuffer instanceof 
PositionBasedFileGroupRecordBuffer);
+        break;
+      default:
+        throw new HoodieIOException("Undefined type");
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
index 1cae85aca33c..d93543c691f6 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
@@ -23,72 +23,62 @@ import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.BaseAvroPayload;
 import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
-import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.table.read.HoodieReadStats;
-import org.apache.hudi.common.table.read.UpdateProcessor;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.StorageConfiguration;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
 
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-class TestKeyBasedFileGroupRecordBuffer {
-  private static final Schema SCHEMA = Schema.createRecord("test_record", 
null, "namespace", false,
-      Arrays.asList(
-          new Schema.Field("record_key", Schema.create(Schema.Type.STRING)),
-          new Schema.Field("counter", Schema.create(Schema.Type.INT)),
-          new Schema.Field("ts", Schema.create(Schema.Type.LONG))));
+class TestKeyBasedFileGroupRecordBuffer extends BaseTestFileGroupRecordBuffer {
   private final IndexedRecord testRecord1 = createTestRecord("1", 1, 1L);
   private final IndexedRecord testRecord1UpdateWithSameTime = 
createTestRecord("1", 2, 1L);
   private final IndexedRecord testRecord2 = createTestRecord("2", 1, 1L);
   private final IndexedRecord testRecord2Update = createTestRecord("2", 1, 2L);
   private final IndexedRecord testRecord2EarlierUpdate = createTestRecord("2", 
1, 0L);
   private final IndexedRecord testRecord2Delete = createTestRecord("2", 2, 3L);
+  private final IndexedRecord testRecord2CustomPayloadExpected = 
createTestRecord("2", 2, 2L);
   private final IndexedRecord testRecord3 = createTestRecord("3", 1, 1L);
   private final IndexedRecord testRecord3Update = createTestRecord("3", 1, 2L);
+  private final IndexedRecord testRecord3UpdateCustomPayloadExpected = 
createTestRecord("3", 2, 2L);
   private final IndexedRecord testRecord3DeleteByFieldValue = 
createTestRecord("3", 3, 1L);
   private final IndexedRecord testRecord4 = createTestRecord("4", 2, 1L);
   private final IndexedRecord testRecord4Update = createTestRecord("4", 1, 2L);
+  private final IndexedRecord testRecord4EarlierUpdate = createTestRecord("4", 
1, 0L);
+  private final IndexedRecord testRecord5 = createTestRecord("5", 1, 1L);
+  private final IndexedRecord testRecord5DeleteByCustomMarker = 
createTestRecord("5", 3, 2L);
+  private final IndexedRecord testRecord6 = createTestRecord("6", 1, 5L);
+  private final IndexedRecord testRecord6DeleteByCustomMarker = 
createTestRecord("6", 3, 2L);
+  private final IndexedRecord testRecord7 = createTestRecord("7", 1, 5L);
 
   @Test
   void readWithEventTimeOrdering() throws IOException {
     HoodieReadStats readStats = new HoodieReadStats();
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
-    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"record_key"}));
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
     KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
@@ -115,7 +105,7 @@ class TestKeyBasedFileGroupRecordBuffer {
   void readWithEventTimeOrderingAndDeleteBlock() throws IOException {
     HoodieReadStats readStats = new HoodieReadStats();
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
-    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"record_key"}));
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
     KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
@@ -132,7 +122,7 @@ class TestKeyBasedFileGroupRecordBuffer {
     
when(dataBlock2.getEngineRecordIterator(readerContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord2EarlierUpdate,
 testRecord3Update).iterator()));
 
     HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
-    when(deleteBlock.getRecordsToDelete()).thenReturn(new 
DeleteRecord[]{DeleteRecord.create("3", ""), DeleteRecord.create("2", "", -1L),
+    when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[] 
{DeleteRecord.create("3", ""), DeleteRecord.create("2", "", -1L),
         DeleteRecord.create("1", "", 2L)});
     // process data block, then delete block, then another data block
     fileGroupRecordBuffer.processDataBlock(dataBlock, Option.empty());
@@ -146,11 +136,50 @@ class TestKeyBasedFileGroupRecordBuffer {
     assertEquals(2, readStats.getNumUpdates());
   }
 
+  @Test
+  void readWithEventTimeOrderingWithRecords() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate, testRecord7), readerContext, properties, new 
String[] {"ts"});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), false));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+        RecordMergeMode.EVENT_TIME_ORDERING, Collections.singletonList("ts"), 
properties, Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    // update for 4 is ignored due to lower ordering value.
+    // record5 is deleted.
+    // delete for 6 is ignored due to lower ordering value.
+    assertEquals(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4, testRecord6, testRecord7), 
actualRecords);
+    assertEquals(1, readStats.getNumInserts());
+    assertEquals(1, readStats.getNumDeletes());
+    assertEquals(3, readStats.getNumUpdates());
+  }
+
   @Test
   void readWithCommitTimeOrdering() throws IOException {
     HoodieReadStats readStats = new HoodieReadStats();
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
-    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"record_key"}));
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
     KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
@@ -176,12 +205,49 @@ class TestKeyBasedFileGroupRecordBuffer {
     assertEquals(2, readStats.getNumUpdates());
   }
 
+  @Test
+  void readWithCommitTimeOrderingWithRecords() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate, testRecord7), readerContext, properties, new 
String[] {});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+        RecordMergeMode.COMMIT_TIME_ORDERING, Collections.singletonList("ts"), 
properties, Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    assertEquals(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7), 
actualRecords);
+    assertEquals(1, readStats.getNumInserts());
+    assertEquals(2, readStats.getNumDeletes());
+    assertEquals(4, readStats.getNumUpdates());
+  }
+
   @Test
   void readWithCustomPayload() throws IOException {
     HoodieReadStats readStats = new HoodieReadStats();
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
     
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
-    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"record_key"}));
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
     KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new 
HoodieAvroRecordMerger(),
@@ -200,7 +266,7 @@ class TestKeyBasedFileGroupRecordBuffer {
         
.thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Delete, testRecord4Update).iterator()));
 
     HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
-    when(deleteBlock.getRecordsToDelete()).thenReturn(new 
DeleteRecord[]{DeleteRecord.create("3", "")});
+    when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[] 
{DeleteRecord.create("3", "")});
     fileGroupRecordBuffer.processDataBlock(dataBlock1, Option.empty());
     fileGroupRecordBuffer.processDataBlock(dataBlock2, Option.empty());
     fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
@@ -213,12 +279,52 @@ class TestKeyBasedFileGroupRecordBuffer {
     assertEquals(0, readStats.getNumUpdates());
   }
 
+  @Test
+  void readWithCustomPayloadWithRecords() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    properties.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
"CUSTOM");
+    properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
TestKeyBasedFileGroupRecordBuffer.CustomPayload.class.getName());
+    properties.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getPayloadClass()).thenReturn(TestKeyBasedFileGroupRecordBuffer.CustomPayload.class.getName());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate), readerContext, properties, new String[] 
{"ts"});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new 
HoodieAvroRecordMerger(),
+        RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties, 
Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected, 
testRecord3UpdateCustomPayloadExpected), actualRecords);
+    assertEquals(0, readStats.getNumInserts());
+    assertEquals(3, readStats.getNumDeletes());
+    assertEquals(2, readStats.getNumUpdates());
+  }
+
   @Test
   void readWithCustomMerger() throws IOException {
     HoodieReadStats readStats = new HoodieReadStats();
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
     
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
-    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"record_key"}));
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
     KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new 
CustomMerger(),
@@ -237,7 +343,7 @@ class TestKeyBasedFileGroupRecordBuffer {
         .thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord2Delete, 
testRecord4Update).iterator()));
 
     HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
-    when(deleteBlock.getRecordsToDelete()).thenReturn(new 
DeleteRecord[]{DeleteRecord.create("3", "")});
+    when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[] 
{DeleteRecord.create("3", "")});
     fileGroupRecordBuffer.processDataBlock(dataBlock1, Option.empty());
     fileGroupRecordBuffer.processDataBlock(dataBlock2, Option.empty());
     fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
@@ -250,117 +356,42 @@ class TestKeyBasedFileGroupRecordBuffer {
     assertEquals(0, readStats.getNumUpdates());
   }
 
-  private static GenericRecord createTestRecord(String recordKey, int counter, 
long ts) {
-    GenericRecord record = new GenericData.Record(SCHEMA);
-    record.put("record_key", recordKey);
-    record.put("counter", counter);
-    record.put("ts", ts);
-    return record;
-  }
-
-  private static KeyBasedFileGroupRecordBuffer<IndexedRecord> 
buildKeyBasedFileGroupRecordBuffer(HoodieReaderContext<IndexedRecord> 
readerContext,
-                                                                               
                  HoodieTableConfig tableConfig,
-                                                                               
                  HoodieReadStats readStats,
-                                                                               
                  HoodieRecordMerger recordMerger,
-                                                                               
                  RecordMergeMode recordMergeMode,
-                                                                               
                  List<String> orderingFieldNames,
-                                                                               
                  Option<Pair<String, String>> deleteMarkerKeyValue) {
-
-    readerContext.setRecordMerger(Option.ofNullable(recordMerger));
-    TypedProperties props = new TypedProperties();
-    deleteMarkerKeyValue.ifPresent(markerKeyValue -> {
-      props.setProperty(DELETE_KEY, markerKeyValue.getLeft());
-      props.setProperty(DELETE_MARKER, markerKeyValue.getRight());
-    });
-    FileGroupReaderSchemaHandler<IndexedRecord> fileGroupReaderSchemaHandler = 
mock(FileGroupReaderSchemaHandler.class);
-    when(fileGroupReaderSchemaHandler.getRequiredSchema()).thenReturn(SCHEMA);
-    
when(fileGroupReaderSchemaHandler.getInternalSchema()).thenReturn(InternalSchema.getEmptyInternalSchema());
-    when(fileGroupReaderSchemaHandler.getDeleteContext()).thenReturn(new 
DeleteContext(props, SCHEMA));
-    readerContext.setSchemaHandler(fileGroupReaderSchemaHandler);
-    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
-    when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
-    UpdateProcessor<IndexedRecord> updateProcessor = 
UpdateProcessor.create(readStats, readerContext, false, Option.empty());
-    return new KeyBasedFileGroupRecordBuffer<>(
-        readerContext, mockMetaClient, recordMergeMode, 
PartialUpdateMode.NONE, props, orderingFieldNames, updateProcessor);
-  }
-
-  private static List<IndexedRecord> 
getActualRecords(FileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer) 
throws IOException {
-    List<IndexedRecord> actualRecords = new ArrayList<>();
-    while (fileGroupRecordBuffer.hasNext()) {
-      actualRecords.add(fileGroupRecordBuffer.next());
-    }
-    return actualRecords;
-  }
+  @Test
+  void readWithCustomMergerWithRecords() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
CustomPayload.class.getName());
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
 
-  /**
-   * A custom payload implementation for testing purposes that marks records 
as deleted once the counter exceeds 2.
-   * During the merge, it will combine the counter values.
-   */
-  public static class CustomPayload extends BaseAvroPayload
-      implements HoodieRecordPayload<CustomPayload> {
-    private final GenericRecord payloadRecord;
-
-    public CustomPayload(GenericRecord record, Comparable orderingVal) {
-      super(record, orderingVal);
-      this.payloadRecord = record;
-    }
-
-    @Override
-    public CustomPayload preCombine(CustomPayload oldValue) {
-      return this;
-    }
-
-    @Override
-    public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) throws IOException {
-      if (currentValue.get(2).equals(payloadRecord.get(2))) {
-        // If the timestamps are the same, we do not update
-        return Option.of(currentValue);
-      }
-      int result = (int) currentValue.get(1) + (int) payloadRecord.get(1);
-      if (result > 2) {
-        return Option.empty();
-      }
-      return Option.of(createTestRecord(currentValue.get(0).toString(), 
result, (long) payloadRecord.get(2)));
-    }
-
-    @Override
-    public Option<IndexedRecord> getInsertValue(Schema schema) throws 
IOException {
-      return Option.of(payloadRecord);
-    }
-
-    @Override
-    public Comparable<?> getOrderingValue() {
-      return null;
-    }
-  }
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate), readerContext, properties, new String[] 
{"ts"});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new 
TestKeyBasedFileGroupRecordBuffer.CustomMerger(),
+        RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties, 
Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
 
-  public static class CustomMerger implements HoodieRecordMerger {
-    private final String strategy = UUID.randomUUID().toString();
-
-    @Override
-    public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
-      GenericRecord olderData = (GenericRecord) older.getData();
-      GenericRecord newerData = (GenericRecord) newer.getData();
-      if (olderData.get(2).equals(newerData.get(2))) {
-        // If the timestamps are the same, we do not update
-        return Option.of(Pair.of(older, oldSchema));
-      }
-      int result = (int) olderData.get(1) + (int) newerData.get(1);
-      if (result > 2) {
-        return Option.empty();
-      }
-      HoodieKey hoodieKey = older.getKey();
-      return Option.of(Pair.of(new 
HoodieAvroIndexedRecord(createTestRecord(hoodieKey.getRecordKey(), result, 
(long) newerData.get(2))), SCHEMA));
-    }
-
-    @Override
-    public HoodieRecord.HoodieRecordType getRecordType() {
-      return HoodieRecord.HoodieRecordType.AVRO;
-    }
-
-    @Override
-    public String getMergingStrategy() {
-      return strategy;
-    }
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected, 
testRecord3UpdateCustomPayloadExpected), actualRecords);
+    assertEquals(0, readStats.getNumInserts());
+    assertEquals(3, readStats.getNumDeletes());
+    assertEquals(2, readStats.getNumUpdates());
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 43da3f7521f0..0ef3da31cc1e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -19,21 +19,31 @@
 
 package org.apache.hudi.common.table.read.buffer;
 
+import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
 import org.apache.hudi.common.table.read.UpdateProcessor;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.StorageConfiguration;
 
+import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -42,13 +52,15 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-class TestSortedKeyBasedFileGroupRecordBuffer {
+class TestSortedKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
   private final TestRecord testRecord1 = new TestRecord("1", 0);
   private final TestRecord testRecord2 = new TestRecord("2", 0);
   private final TestRecord testRecord2Update = new TestRecord("2", 1);
@@ -58,31 +70,92 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
   private final TestRecord testRecord6 = new TestRecord("6", 0);
   private final TestRecord testRecord6Update = new TestRecord("6", 1);
 
+  private final IndexedRecord testIndexedRecord1 = createTestRecord("1", 1, 
1L);
+  private final IndexedRecord testIndexedRecord2 = createTestRecord("2", 1, 
1L);
+  private final IndexedRecord testIndexedRecord2Update = createTestRecord("2", 
1, 2L);
+  private final IndexedRecord testIndexedRecord3 = createTestRecord("3", 1, 
1L);
+  private final IndexedRecord testIndexedRecord4 = createTestRecord("4", 2, 
2L);
+  private final IndexedRecord testIndexedRecord4LowerOrdering = 
createTestRecord("4", 2, 1L);
+  private final IndexedRecord testIndexedRecord5 = createTestRecord("5", 1, 
1L);
+  private final IndexedRecord testRecord5DeleteByCustomMarker = 
createTestRecord("5", 3, 2L);
+  private final IndexedRecord testIndexedRecord6 = createTestRecord("6", 1, 
5L);
+  private final IndexedRecord testIndexedRecord6Update = createTestRecord("6", 
2, 10L);
+
   @Test
   void readBaseFileAndLogFile() throws IOException {
     HoodieReadStats readStats = new HoodieReadStats();
     HoodieReaderContext<TestRecord> mockReaderContext = 
mock(HoodieReaderContext.class, RETURNS_DEEP_STUBS);
+
     SortedKeyBasedFileGroupRecordBuffer<TestRecord> fileGroupRecordBuffer = 
buildSortedKeyBasedFileGroupRecordBuffer(mockReaderContext, readStats);
 
     
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord2,
 testRecord3, testRecord5).iterator()));
 
     HoodieDataBlock dataBlock = mock(HoodieDataBlock.class);
     
when(dataBlock.getSchema()).thenReturn(HoodieTestDataGenerator.AVRO_SCHEMA);
-    
when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord6,
 testRecord4, testRecord1, testRecord6Update, testRecord2Update).iterator()));
+    when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(
+        ClosableIterator.wrap(Arrays.asList(testRecord6, testRecord4, 
testRecord1, testRecord6Update, testRecord2Update).iterator()));
 
     HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
-    when(deleteBlock.getRecordsToDelete()).thenReturn(new 
DeleteRecord[]{DeleteRecord.create("3", "")});
+    when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[] 
{DeleteRecord.create("3", "")});
     fileGroupRecordBuffer.processDataBlock(dataBlock, Option.empty());
     fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
 
-
-    List<TestRecord> actualRecords = getActualRecords(fileGroupRecordBuffer);
+    List<TestRecord> actualRecords = 
getActualRecordsForSortedKeyBased(fileGroupRecordBuffer);
     assertEquals(Arrays.asList(testRecord1, testRecord2Update, testRecord4, 
testRecord5, testRecord6Update), actualRecords);
     assertEquals(3, readStats.getNumInserts());
     assertEquals(1, readStats.getNumUpdates());
     assertEquals(1, readStats.getNumDeletes());
   }
 
+  @Test
+  void readWithStreamingRecordBufferLoaderAndEventTimeOrdering() throws 
IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+    
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    readerContext.initRecordMerger(properties);
+    List<BufferedRecord> inputRecords =
+        convertToBufferedRecordsList(Arrays.asList(testIndexedRecord6Update, 
testIndexedRecord4LowerOrdering, testIndexedRecord1, testIndexedRecord2Update), 
readerContext, properties,
+            new String[]{"ts"});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker),
 false));
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
+    
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
+    FileGroupRecordBufferLoader recordBufferLoader = 
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+    InputSplit inputSplit = mock(InputSplit.class);
+    when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
+    when(inputSplit.getRecordIterator()).thenReturn(inputRecords.iterator());
+    ReaderParameters readerParameters = mock(ReaderParameters.class);
+    when(readerParameters.sortOutputs()).thenReturn(true);
+    SortedKeyBasedFileGroupRecordBuffer fileGroupRecordBuffer  = 
(SortedKeyBasedFileGroupRecordBuffer<IndexedRecord>) recordBufferLoader
+        .getRecordBuffer(readerContext, mockMetaClient.getStorage(), 
inputSplit, Collections.singletonList("ts"), mockMetaClient, properties,
+            readerParameters, readStats, Option.empty()).getKey();
+    
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testIndexedRecord2,
 testIndexedRecord3, testIndexedRecord4,
+        testIndexedRecord5, testIndexedRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    assertEquals(Arrays.asList(testIndexedRecord1, testIndexedRecord2Update, 
testIndexedRecord3, testIndexedRecord4, testIndexedRecord6Update), 
actualRecords);
+    assertEquals(1, readStats.getNumInserts());
+    assertEquals(1, readStats.getNumDeletes());
+    assertEquals(2, readStats.getNumUpdates());
+  }
+
   @Test
   void readLogFiles() throws IOException {
     HoodieReadStats readStats = new HoodieReadStats();
@@ -100,13 +173,13 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
     
when(dataBlock2.getEngineRecordIterator(mockReaderContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord2Update,
 testRecord5, testRecord3, testRecord1).iterator()));
 
     HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
-    when(deleteBlock.getRecordsToDelete()).thenReturn(new 
DeleteRecord[]{DeleteRecord.create("3", "")});
+    when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[] 
{DeleteRecord.create("3", "")});
     fileGroupRecordBuffer.processDataBlock(dataBlock1, Option.empty());
     fileGroupRecordBuffer.processDataBlock(dataBlock2, Option.empty());
     fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
 
 
-    List<TestRecord> actualRecords = getActualRecords(fileGroupRecordBuffer);
+    List<TestRecord> actualRecords = 
getActualRecordsForSortedKeyBased(fileGroupRecordBuffer);
     assertEquals(Arrays.asList(testRecord1, testRecord2Update, testRecord4, 
testRecord5, testRecord6Update), actualRecords);
     assertEquals(5, readStats.getNumInserts());
     assertEquals(0, readStats.getNumUpdates());
@@ -133,12 +206,11 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
         mockReaderContext, mockMetaClient, recordMergeMode, partialUpdateMode, 
props, Collections.emptyList(), updateProcessor);
   }
 
-  private static List<TestRecord> 
getActualRecords(SortedKeyBasedFileGroupRecordBuffer<TestRecord> 
fileGroupRecordBuffer) throws IOException {
-    List<TestRecord> actualRecords = new ArrayList<>();
+  private static <T> List<T> 
getActualRecordsForSortedKeyBased(SortedKeyBasedFileGroupRecordBuffer<T> 
fileGroupRecordBuffer) throws IOException {
+    List<T> actualRecords = new ArrayList<>();
     while (fileGroupRecordBuffer.hasNext()) {
       actualRecords.add(fileGroupRecordBuffer.next());
     }
     return actualRecords;
   }
-
 }
\ No newline at end of file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
new file mode 100644
index 000000000000..4aa7856c57cc
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.buffer;
+
+import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
+  private final IndexedRecord testRecord1 = createTestRecord("1", 1, 1L);
+  private final IndexedRecord testRecord1UpdateWithSameTime = 
createTestRecord("1", 2, 1L);
+  private final IndexedRecord testRecord2 = createTestRecord("2", 1, 1L);
+  private final IndexedRecord testRecord2Update = createTestRecord("2", 1, 2L);
+  private final IndexedRecord testRecord2CustomPayloadExpected = 
createTestRecord("2", 2, 2L);
+  private final IndexedRecord testRecord3 = createTestRecord("3", 1, 1L);
+  private final IndexedRecord testRecord3Update = createTestRecord("3", 1, 2L);
+  private final IndexedRecord testRecord3UpdateCustomPayloadExpected = 
createTestRecord("3", 2, 2L);
+  private final IndexedRecord testRecord4 = createTestRecord("4", 2, 1L);
+  private final IndexedRecord testRecord4EarlierUpdate = createTestRecord("4", 
1, 0L);
+  private final IndexedRecord testRecord5 = createTestRecord("5", 1, 1L);
+  private final IndexedRecord testRecord5DeleteByCustomMarker = 
createTestRecord("5", 3, 2L);
+  private final IndexedRecord testRecord6 = createTestRecord("6", 1, 5L);
+  private final IndexedRecord testRecord6DeleteByCustomMarker = 
createTestRecord("6", 3, 2L);
+  private final IndexedRecord testRecord7 = createTestRecord("7", 1, 5L);
+
+  @Test
+  void readWithEventTimeOrdering() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate, testRecord7), readerContext, properties, new 
String[] {"ts"});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), false));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+        RecordMergeMode.EVENT_TIME_ORDERING, Collections.singletonList("ts"), 
properties, Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    // update for 4 is ignored due to lower ordering value.
+    // record5 is deleted.
+    // delete for 6 is ignored due to lower ordering value.
+    assertEquals(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4, testRecord6, testRecord7), 
actualRecords);
+    assertEquals(1, readStats.getNumInserts());
+    assertEquals(1, readStats.getNumDeletes());
+    assertEquals(3, readStats.getNumUpdates());
+  }
+
+  @Test
+  void readWithCommitTimeOrdering() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate, testRecord7), readerContext, properties, new 
String[] {});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+        RecordMergeMode.COMMIT_TIME_ORDERING, Collections.singletonList("ts"), 
properties, Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    assertEquals(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7), 
actualRecords);
+    assertEquals(1, readStats.getNumInserts());
+    assertEquals(2, readStats.getNumDeletes());
+    assertEquals(4, readStats.getNumUpdates());
+  }
+
+  @Test
+  void readWithCustomPayload() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    properties.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
"CUSTOM");
+    properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
CustomPayload.class.getName());
+    properties.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate), readerContext, properties, new String[] 
{"ts"});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new 
HoodieAvroRecordMerger(),
+        RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties, 
Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected, 
testRecord3UpdateCustomPayloadExpected), actualRecords);
+    assertEquals(0, readStats.getNumInserts());
+    assertEquals(3, readStats.getNumDeletes());
+    assertEquals(2, readStats.getNumUpdates());
+  }
+
+  @Test
+  void readWithCustomMergerWithRecords() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
CustomPayload.class.getName());
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+    
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.initRecordMerger(properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    List<BufferedRecord> inputRecords = 
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate), readerContext, properties, new String[] 
{"ts"});
+    
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new 
TestKeyBasedFileGroupRecordBuffer.CustomMerger(),
+        RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties, 
Option.of(inputRecords.iterator()));
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected, 
testRecord3UpdateCustomPayloadExpected), actualRecords);
+    assertEquals(0, readStats.getNumInserts());
+    assertEquals(3, readStats.getNumDeletes());
+    assertEquals(2, readStats.getNumUpdates());
+  }
+}


Reply via email to