This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cce019ad0b9 [HUDI-6786] HoodieFileGroupReader integration (#9819)
cce019ad0b9 is described below

commit cce019ad0b92cc6a333839b4a50d123aca4c9c4a
Author: Lin Liu <[email protected]>
AuthorDate: Wed Oct 18 14:01:12 2023 -0700

    [HUDI-6786] HoodieFileGroupReader integration (#9819)
    
    This commit integrates HoodieFileGroupReader with the 
NewHoodieParquetFileFormat.
---
 .../SparkFileFormatInternalRowReaderContext.scala  |  22 +-
 .../hudi/common/config/HoodieReaderConfig.java     |   9 +-
 .../table/log/BaseHoodieLogRecordReader.java       | 106 +-------
 .../FullKeySpec.java}                              |  33 ++-
 .../table/log/HoodieMergedLogRecordReader.java     | 102 ++------
 .../FileGroupReaderState.java => log/KeySpec.java} |  29 ++-
 .../PrefixKeySpec.java}                            |  33 ++-
 .../common/table/log/block/HoodieDataBlock.java    |   4 +
 .../read/HoodieBaseFileGroupRecordBuffer.java      | 279 ++++++++++++++++++++
 .../common/table/read/HoodieFileGroupReader.java   | 126 +++++----
 ...rState.java => HoodieFileGroupReaderState.java} |   2 +-
 .../table/read/HoodieFileGroupRecordBuffer.java    | 119 +++++++++
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  | 159 ++++++++++++
 .../HoodiePositionBasedFileGroupRecordBuffer.java  | 191 ++++++++++++++
 .../table/read/TestHoodieFileGroupReaderBase.java  |  31 ++-
 .../hudi/NewHoodieParquetFileFormatUtils.scala     |  23 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 284 +++++++++++++++++++++
 .../hudi/TestHoodieMergeHandleWithSparkMerger.java |  70 ++++-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  35 ++-
 19 files changed, 1333 insertions(+), 324 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 5bf2c3a0fbc..25ea9d3332b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.util.collection.ClosableIterator
 import org.apache.hudi.util.CloseableInternalRowIterator
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{HoodieInternalRowUtils, SparkSession}
@@ -36,13 +37,11 @@ import org.apache.spark.sql.{HoodieInternalRowUtils, 
SparkSession}
  *
  * This uses Spark parquet reader to read parquet data files or parquet log 
blocks.
  *
- * @param sparkSession      {@link SparkSession} instance.
- * @param parquetFileFormat {@link ParquetFileFormat} instance for parquet 
file format in Spark.
- * @param hadoopConf        Hadoop configuration.
+ * @param baseFileReader    A reader that transforms a {@link PartitionedFile} 
to an iterator of {@link InternalRow}
+ * @param partitionValues   The values for a partition in which the file group 
lives.
  */
-class SparkFileFormatInternalRowReaderContext(sparkSession: SparkSession,
-                                              parquetFileFormat: 
ParquetFileFormat,
-                                              hadoopConf: Configuration) 
extends BaseSparkInternalRowReaderContext {
+class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile 
=> Iterator[InternalRow],
+                                              partitionValues: InternalRow) 
extends BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
 
   override def getFileRecordIterator(filePath: Path,
@@ -51,14 +50,7 @@ class SparkFileFormatInternalRowReaderContext(sparkSession: 
SparkSession,
                                      dataSchema: Schema,
                                      requiredSchema: Schema,
                                      conf: Configuration): 
ClosableIterator[InternalRow] = {
-    val fileInfo = 
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(
-      InternalRow.empty, filePath, 0, length)
-
-    val dataStructSchema = HoodieInternalRowUtils.getCachedSchema(dataSchema)
-    val requiredStructSchema = 
HoodieInternalRowUtils.getCachedSchema(requiredSchema)
-    new 
CloseableInternalRowIterator(parquetFileFormat.buildReaderWithPartitionValues(
-      sparkSession, dataStructSchema, StructType(Seq.empty), 
requiredStructSchema, Seq.empty,
-      Map(), conf
-    ).apply(fileInfo))
+    val fileInfo = 
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
 filePath, start, length)
+    new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index d25d1898ff2..1738f75e9ec 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -28,7 +28,7 @@ import javax.annotation.concurrent.Immutable;
 @ConfigClassProperty(name = "Reader Configs",
     groupName = ConfigGroups.Names.READER,
     description = "Configurations that control file group reading.")
-public class HoodieReaderConfig {
+public class HoodieReaderConfig extends HoodieConfig {
   public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE 
= ConfigProperty
       .key("hoodie.compaction.lazy.block.read")
       .defaultValue("true")
@@ -51,4 +51,11 @@ public class HoodieReaderConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("New optimized scan for log blocks that handles all 
multi-writer use-cases while appending to log files. "
           + "It also differentiates original blocks written by ingestion 
writers and compacted blocks written log compaction.");
+
+  public static final ConfigProperty<Boolean> FILE_GROUP_READER_ENABLED = 
ConfigProperty
+      .key("hoodie.file.group.reader.enabled")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Use engine agnostic file group reader if enabled");
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 9885e19d05e..300b767ab70 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -32,6 +31,7 @@ import 
org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -138,6 +138,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
   private final List<String> validBlockInstants = new ArrayList<>();
   // Use scanV2 method.
   private final boolean enableOptimizedLogBlocksScan;
+  protected HoodieFileGroupRecordBuffer<T> recordBuffer;
 
   protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
                                       FileSystem fs, String basePath, 
List<String> logFilePaths,
@@ -148,7 +149,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
                                       InternalSchema internalSchema,
                                       Option<String> keyFieldOverride,
                                       boolean enableOptimizedLogBlocksScan,
-                                      HoodieRecordMerger recordMerger) {
+                                      HoodieRecordMerger recordMerger,
+                                      HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
     this.readerContext = readerContext;
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
@@ -198,6 +200,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
 
     this.partitionNameOverrideOpt = partitionNameOverride;
     this.recordType = recordMerger.getRecordType();
+    this.recordBuffer = recordBuffer;
   }
 
   /**
@@ -270,6 +273,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
             continue;
           }
         }
+
         switch (logBlock.getBlockType()) {
           case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
@@ -731,47 +735,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
         
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
   }
 
-  /**
-   * Iterate over the GenericRecord in the block, read the hoodie key and 
partition path and call subclass processors to
-   * handle it.
-   */
-  private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws Exception {
-    checkState(partitionNameOverrideOpt.isPresent() || 
partitionPathFieldOpt.isPresent(),
-        "Either partition-name override or partition-path field had to be 
present");
-
-    Option<Pair<String, String>> recordKeyPartitionPathFieldPair = 
populateMetaFields
-        ? Option.empty()
-        : Option.of(Pair.of(recordKeyField, 
partitionPathFieldOpt.orElse(null)));
-
-    Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
-        getRecordsIterator(dataBlock, keySpecOpt);
-
-    try (ClosableIterator<T> recordIterator = 
recordsIteratorSchemaPair.getLeft()) {
-      while (recordIterator.hasNext()) {
-        T nextRecord = recordIterator.next();
-        processNextRecord(nextRecord,
-            readerContext.generateMetadataForRecord(nextRecord, readerSchema));
-        totalLogRecords.incrementAndGet();
-      }
-    }
-  }
-
-  /**
-   * Process next record.
-   *
-   * @param record   The next record in engine-specific representation.
-   * @param metadata The metadata of the record.
-   * @throws Exception
-   */
-  public abstract void processNextRecord(T record, Map<String, Object> 
metadata) throws Exception;
-
-  /**
-   * Process next deleted record.
-   *
-   * @param deleteRecord Deleted record(hoodie key and ordering value)
-   */
-  protected abstract void processNextDeletedRecord(DeleteRecord deleteRecord);
-
   /**
    * Process the set of log blocks belonging to the last instant which is read 
fully.
    */
@@ -785,10 +748,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
         case AVRO_DATA_BLOCK:
         case HFILE_DATA_BLOCK:
         case PARQUET_DATA_BLOCK:
-          processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt);
+          recordBuffer.processDataBlock((HoodieDataBlock) lastBlock, 
keySpecOpt);
           break;
         case DELETE_BLOCK:
-          Arrays.stream(((HoodieDeleteBlock) 
lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
+          recordBuffer.processDeleteBlock((HoodieDeleteBlock) lastBlock);
           break;
         case CORRUPT_BLOCK:
           LOG.warn("Found a corrupt block which was not rolled back");
@@ -850,59 +813,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
     return payloadProps;
   }
 
-  /**
-   * Key specification with a list of column names.
-   */
-  protected interface KeySpec {
-    List<String> getKeys();
-
-    boolean isFullKey();
-
-    static KeySpec fullKeySpec(List<String> keys) {
-      return new FullKeySpec(keys);
-    }
-
-    static KeySpec prefixKeySpec(List<String> keyPrefixes) {
-      return new PrefixKeySpec(keyPrefixes);
-    }
-  }
-
-  private static class FullKeySpec implements KeySpec {
-    private final List<String> keys;
-
-    private FullKeySpec(List<String> keys) {
-      this.keys = keys;
-    }
-
-    @Override
-    public List<String> getKeys() {
-      return keys;
-    }
-
-    @Override
-    public boolean isFullKey() {
-      return true;
-    }
-  }
-
-  private static class PrefixKeySpec implements KeySpec {
-    private final List<String> keysPrefixes;
-
-    private PrefixKeySpec(List<String> keysPrefixes) {
-      this.keysPrefixes = keysPrefixes;
-    }
-
-    @Override
-    public List<String> getKeys() {
-      return keysPrefixes;
-    }
-
-    @Override
-    public boolean isFullKey() {
-      return false;
-    }
-  }
-
   public Deque<HoodieLogBlock> getCurrentInstantLogBlocks() {
     return currentInstantLogBlocks;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
 b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
similarity index 61%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
index 2caa146fc92..ede7918649b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
@@ -17,20 +17,29 @@
  * under the License.
  */
 
-package org.apache.hudi.common.table.read;
+package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
+import java.util.List;
 
 /**
- * A class holding the state that is needed by {@code HoodieFileGroupReader},
- * e.g., schema, merging strategy, etc.
+ * This class specifies a set of record keys that is a full key.
+ * That is, the comparison between a record key and an element
+ * of the set is {@link String#equals}.
  */
-public class FileGroupReaderState {
-  public String tablePath;
-  public String latestCommitTime;
-  public Schema baseFileAvroSchema;
-  public Schema logRecordAvroSchema;
-  public TypedProperties mergeProps = new TypedProperties();
+public class FullKeySpec implements KeySpec {
+  private final List<String> keys;
+
+  public FullKeySpec(List<String> keys) {
+    this.keys = keys;
+  }
+
+  @Override
+  public List<String> getKeys() {
+    return keys;
+  }
+
+  @Override
+  public boolean isFullKey() {
+    return true;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index aedb1a8e992..6f417be072b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -20,16 +20,15 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.internal.schema.InternalSchema;
@@ -41,8 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -63,8 +60,6 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
   // A timer for calculating elapsed time in millis
   public final HoodieTimer timer = HoodieTimer.create();
-  // Map of compacted/merged records with metadata
-  private final Map<String, Pair<Option<T>, Map<String, Object>>> records;
   // Set of already scanned prefixes allowing us to avoid scanning same 
prefixes again
   private final Set<String> scannedPrefixes;
   // count of merged records in log
@@ -81,11 +76,14 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
                                       Option<String> partitionName,
                                       InternalSchema internalSchema,
                                       Option<String> keyFieldOverride,
-                                      boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger) {
+                                      boolean enableOptimizedLogBlocksScan,
+                                      HoodieRecordMerger recordMerger,
+                                      HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
     super(readerContext, fs, basePath, logFilePaths, readerSchema, 
latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
-        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
-    this.records = new HashMap<>();
+        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan,
+        recordMerger, recordBuffer);
     this.scannedPrefixes = new HashSet<>();
+
     if (forceFullScan) {
       performScan();
     }
@@ -122,7 +120,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     }
 
     List<String> missingKeys = keys.stream()
-        .filter(key -> !records.containsKey(key))
+        .filter(key -> !recordBuffer.containsLogRecord(key))
         .collect(Collectors.toList());
 
     if (missingKeys.isEmpty()) {
@@ -174,19 +172,19 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     scanInternal(Option.empty(), false);
 
     this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
-    this.numMergedRecordsInLog = records.size();
+    this.numMergedRecordsInLog = recordBuffer.size();
 
     LOG.info("Number of log files scanned => " + logFilePaths.size());
-    LOG.info("Number of entries in Map => " + records.size());
+    LOG.info("Number of entries in Map => " + recordBuffer.size());
   }
 
   @Override
   public Iterator<Pair<Option<T>, Map<String, Object>>> iterator() {
-    return records.values().iterator();
+    return recordBuffer.getLogRecordIterator();
   }
 
-  public Map<String, Pair<Option<T>, Map<String, Object>>> getRecords() {
-    return records;
+  public Map<Object, Pair<Option<T>, Map<String, Object>>> getRecords() {
+    return recordBuffer.getLogRecords();
   }
 
   public HoodieRecord.HoodieRecordType getRecordType() {
@@ -204,69 +202,13 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     return new Builder();
   }
 
-  @Override
-  public void processNextRecord(T record, Map<String, Object> metadata) throws 
IOException {
-    String key = (String) 
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
-    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(key);
-
-    if (existingRecordMetadataPair != null) {
-      // Merge and store the combined record
-      // Note that the incoming `record` is from an older commit, so it should 
be put as
-      // the `older` in the merge API
-      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
-          readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema), readerSchema,
-          
readerContext.constructHoodieRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-          readerSchema, this.getPayloadProps()).get().getLeft();
-      // If pre-combine returns existing record, no need to update it
-      if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
-        records.put(key, 
Pair.of(Option.ofNullable(readerContext.seal(combinedRecord.getData())), 
metadata));
-      }
-    } else {
-      // Put the record as is
-      // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
-      //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
-      //       it since these records will be put into records(Map).
-      records.put(key, Pair.of(Option.of(readerContext.seal(record)), 
metadata));
-    }
-  }
-
-  @Override
-  protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
-    String key = deleteRecord.getRecordKey();
-    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(key);
-    if (existingRecordMetadataPair != null) {
-      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
-      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
-      // For same ordering values, uses the natural order(arrival time 
semantics).
-
-      Comparable existingOrderingVal = readerContext.getOrderingValue(
-          existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema,
-          this.hoodieTableMetaClient.getTableConfig().getProps());
-      Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
-      // Checks the ordering value does not equal to 0
-      // because we use 0 as the default value which means natural order
-      boolean chooseExisting = !deleteOrderingVal.equals(0)
-          && ReflectionUtils.isSameClass(existingOrderingVal, 
deleteOrderingVal)
-          && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
-      if (chooseExisting) {
-        // The DELETE message is obsolete if the old message has greater 
orderingVal.
-        return;
-      }
-    }
-    // Put the DELETE record
-    records.put(key, Pair.of(Option.empty(),
-        readerContext.generateMetadataForRecord(key, 
deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue())));
-  }
-
   public long getTotalTimeTakenToReadAndMergeBlocks() {
     return totalTimeTakenToReadAndMergeBlocks;
   }
 
   @Override
   public void close() {
-    if (records != null) {
-      records.clear();
-    }
+    // No op.
   }
 
   /**
@@ -296,6 +238,8 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     private boolean enableOptimizedLogBlocksScan = false;
     private HoodieRecordMerger recordMerger = 
HoodiePreCombineAvroRecordMerger.INSTANCE;
 
+    private HoodieFileGroupRecordBuffer<T> recordBuffer;
+
     @Override
     public Builder<T> withHoodieReaderContext(HoodieReaderContext<T> 
readerContext) {
       this.readerContext = readerContext;
@@ -397,18 +341,28 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       return this;
     }
 
+    public Builder<T> withRecordBuffer(HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
+      this.recordBuffer = recordBuffer;
+      return this;
+    }
+
     @Override
     public HoodieMergedLogRecordReader<T> build() {
       if (this.partitionName == null && 
CollectionUtils.nonEmpty(this.logFilePaths)) {
         this.partitionName = getRelativePartitionPath(new Path(basePath), new 
Path(this.logFilePaths.get(0)).getParent());
       }
       ValidationUtils.checkArgument(recordMerger != null);
+      ValidationUtils.checkArgument(recordBuffer != null);
 
-      return new HoodieMergedLogRecordReader<>(readerContext, fs, basePath, 
logFilePaths, readerSchema,
+      return new HoodieMergedLogRecordReader<>(
+          readerContext, fs, basePath, logFilePaths, readerSchema,
           latestInstantTime, readBlocksLazily, reverseReader,
           bufferSize, instantRange,
           withOperationField, forceFullScan,
-          Option.ofNullable(partitionName), internalSchema, 
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, 
recordMerger);
+          Option.ofNullable(partitionName), internalSchema,
+          Option.ofNullable(keyFieldOverride),
+          enableOptimizedLogBlocksScan, recordMerger,
+          recordBuffer);
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
 b/hudi-common/src/main/java/org/apache/hudi/common/table/log/KeySpec.java
similarity index 60%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
copy to hudi-common/src/main/java/org/apache/hudi/common/table/log/KeySpec.java
index 2caa146fc92..c7ecfd383ef 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/KeySpec.java
@@ -17,20 +17,25 @@
  * under the License.
  */
 
-package org.apache.hudi.common.table.read;
+package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
+import java.util.List;
 
 /**
- * A class holding the state that is needed by {@code HoodieFileGroupReader},
- * e.g., schema, merging strategy, etc.
+ * An interface to specify a set of keys that used to filter records
+ * from log files. That is, only the records whose record key match
+ * any of the specified keys can go to the downstream operations.
  */
-public class FileGroupReaderState {
-  public String tablePath;
-  public String latestCommitTime;
-  public Schema baseFileAvroSchema;
-  public Schema logRecordAvroSchema;
-  public TypedProperties mergeProps = new TypedProperties();
+public interface KeySpec {
+  List<String> getKeys();
+
+  boolean isFullKey();
+
+  static KeySpec fullKeySpec(List<String> keys) {
+    return new FullKeySpec(keys);
+  }
+
+  static KeySpec prefixKeySpec(List<String> keyPrefixes) {
+    return new PrefixKeySpec(keyPrefixes);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
 b/hudi-common/src/main/java/org/apache/hudi/common/table/log/PrefixKeySpec.java
similarity index 58%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/table/log/PrefixKeySpec.java
index 2caa146fc92..0ab8c4eadb2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/PrefixKeySpec.java
@@ -17,20 +17,29 @@
  * under the License.
  */
 
-package org.apache.hudi.common.table.read;
+package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
+import java.util.List;
 
 /**
- * A class holding the state that is needed by {@code HoodieFileGroupReader},
- * e.g., schema, merging strategy, etc.
+ * This class specifies a set of record keys, each of which is a prefix.
+ * That is, the comparison between a record key and an element
+ * of the set is {@link String#startsWith(String)}.
  */
-public class FileGroupReaderState {
-  public String tablePath;
-  public String latestCommitTime;
-  public Schema baseFileAvroSchema;
-  public Schema logRecordAvroSchema;
-  public TypedProperties mergeProps = new TypedProperties();
+public class PrefixKeySpec implements KeySpec {
+  private final List<String> keysPrefixes;
+
+  public PrefixKeySpec(List<String> keysPrefixes) {
+    this.keysPrefixes = keysPrefixes;
+  }
+
+  @Override
+  public List<String> getKeys() {
+    return keysPrefixes;
+  }
+
+  @Override
+  public boolean isFullKey() {
+    return false;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 2b3dfaf6a61..a0f9016f409 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -141,6 +141,10 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     return serializeRecords(records.get());
   }
 
+  public String getKeyFieldName() {
+    return keyFieldName;
+  }
+
   protected static Schema getWriterSchema(Map<HeaderMetadataType, String> 
logBlockHeader) {
     return new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..a1685b82a93
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.avro.Schema;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class HoodieBaseFileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T> {
+  protected final HoodieReaderContext<T> readerContext;
+  protected final Schema readerSchema;
+  protected final Schema baseFileSchema;
+  protected final Option<String> partitionNameOverrideOpt;
+  protected final Option<String[]> partitionPathFieldOpt;
+  protected final HoodieRecordMerger recordMerger;
+  protected final TypedProperties payloadProps;
+  protected final HoodieTableMetaClient hoodieTableMetaClient;
+  protected final Map<Object, Pair<Option<T>, Map<String, Object>>> records;
+  protected ClosableIterator<T> baseFileIterator;
+  protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
+  protected T nextRecord;
+
+  public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
+                                         Schema readerSchema,
+                                         Schema baseFileSchema,
+                                         Option<String> 
partitionNameOverrideOpt,
+                                         Option<String[]> 
partitionPathFieldOpt,
+                                         HoodieRecordMerger recordMerger,
+                                         TypedProperties payloadProps,
+                                         HoodieTableMetaClient 
hoodieTableMetaClient) {
+    this.readerContext = readerContext;
+    this.readerSchema = readerSchema;
+    this.baseFileSchema = baseFileSchema;
+    this.partitionNameOverrideOpt = partitionNameOverrideOpt;
+    this.partitionPathFieldOpt = partitionPathFieldOpt;
+    this.recordMerger = recordMerger;
+    this.payloadProps = payloadProps;
+    this.hoodieTableMetaClient = hoodieTableMetaClient;
+    this.records = new HashMap<>();
+  }
+
+  @Override
+  public void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator) {
+    this.baseFileIterator = baseFileIterator;
+  }
+
+  @Override
+  public T next() {
+    return nextRecord;
+  }
+
+  @Override
+  public Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords() {
+    return records;
+  }
+
+  @Override
+  public int size() {
+    return records.size();
+  }
+
+  @Override
+  public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator() 
{
+    return records.values().iterator();
+  }
+
+  @Override
+  public void close() {
+    records.clear();
+  }
+
+  /**
+   * Merge two log data records if needed.
+   *
+   * @param record
+   * @param metadata
+   * @param existingRecordMetadataPair
+   * @return
+   * @throws IOException
+   */
+  protected Option<T> doProcessNextDataRecord(T record,
+                                              Map<String, Object> metadata,
+                                              Pair<Option<T>, Map<String, 
Object>> existingRecordMetadataPair) throws IOException {
+    if (existingRecordMetadataPair != null) {
+      // Merge and store the combined record
+      // Note that the incoming `record` is from an older commit, so it should 
be put as
+      // the `older` in the merge API
+      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
+          readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
+          readerSchema,
+          readerContext.constructHoodieRecord(
+              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
+          readerSchema,
+          payloadProps).get().getLeft();
+      // If pre-combine returns existing record, no need to update it
+      if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
+        return Option.of(combinedRecord.getData());
+      }
+      return Option.empty();
+    } else {
+      // Put the record as is
+      // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
+      //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
+      //       it since these records will be put into records(Map).
+      return Option.of(record);
+    }
+  }
+
+  /**
+   * Merge a delete record with another record (data, or delete).
+   *
+   * @param deleteRecord
+   * @param existingRecordMetadataPair
+   * @return
+   */
+  protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord 
deleteRecord,
+                                                            Pair<Option<T>, 
Map<String, Object>> existingRecordMetadataPair) {
+    if (existingRecordMetadataPair != null) {
+      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order(arrival time 
semantics).
+      Comparable existingOrderingVal = readerContext.getOrderingValue(
+          existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema,
+          this.hoodieTableMetaClient.getTableConfig().getProps());
+      Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean chooseExisting = !deleteOrderingVal.equals(0)
+          && ReflectionUtils.isSameClass(existingOrderingVal, 
deleteOrderingVal)
+          && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
+      if (chooseExisting) {
+        // The DELETE message is obsolete if the old message has greater 
orderingVal.
+        return Option.empty();
+      }
+    }
+    // Do delete.
+    return Option.of(deleteRecord);
+  }
+
+  /**
+   * Create a record iterator for a data block. The records are filtered by a 
key set specified by {@code keySpecOpt}.
+   *
+   * @param dataBlock
+   * @param keySpecOpt
+   * @return
+   * @throws IOException
+   */
+  protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(
+      HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws 
IOException {
+    ClosableIterator<T> blockRecordsIterator;
+    if (keySpecOpt.isPresent()) {
+      KeySpec keySpec = keySpecOpt.get();
+      blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext, 
keySpec.getKeys(), keySpec.isFullKey());
+    } else {
+      blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
+    }
+    return Pair.of(blockRecordsIterator, dataBlock.getSchema());
+  }
+
+  /**
+   * Merge two records using the configured record merger.
+   *
+   * @param older
+   * @param olderInfoMap
+   * @param newer
+   * @param newerInfoMap
+   * @return
+   * @throws IOException
+   */
+  protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
+                          Option<T> newer, Map<String, Object> newerInfoMap) 
throws IOException {
+    if (!older.isPresent()) {
+      return newer;
+    }
+
+    Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
+        readerContext.constructHoodieRecord(older, olderInfoMap, 
baseFileSchema), baseFileSchema,
+        readerContext.constructHoodieRecord(newer, newerInfoMap, 
readerSchema), readerSchema, payloadProps);
+    if (mergedRecord.isPresent()) {
+      return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Filter a record for downstream processing when:
+   *  1. A set of pre-specified keys exists.
+   *  2. The key of the record is not contained in the set.
+   */
+  protected boolean shouldSkip(T record, String keyFieldName, boolean 
isFullKey, Set<String> keys) {
+    String recordKey = readerContext.getValue(record, readerSchema, 
keyFieldName).toString();
+    // Can not extract the record key, throw.
+    if (recordKey == null || recordKey.isEmpty()) {
+      throw new HoodieKeyException("Can not extract the key for a record");
+    }
+
+    // No keys are specified. Cannot skip at all.
+    if (keys.isEmpty()) {
+      return false;
+    }
+
+    // When the record key matches with one of the keys or key prefixes, can 
not skip.
+    if ((isFullKey && keys.contains(recordKey))
+        || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
+      return false;
+    }
+
+    // Otherwise, this record is not needed.
+    return true;
+  }
+
+  /**
+   * Extract the record positions from a log block header.
+   *
+   * @param logBlock
+   * @return
+   * @throws IOException
+   */
+  protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock) 
throws IOException {
+    List<Long> blockPositions = new ArrayList<>();
+
+    Roaring64NavigableMap positions = logBlock.getRecordPositions();
+    if (positions == null || positions.isEmpty()) {
+      throw new HoodieValidationException("No record position info is found 
when attempt to do position based merge.");
+    }
+
+    Iterator<Long> iterator = positions.iterator();
+    while (iterator.hasNext()) {
+      blockPositions.add(iterator.next());
+    }
+
+    if (blockPositions.isEmpty()) {
+      throw new HoodieCorruptedDataException("No positions are extracted.");
+    }
+
+    return blockPositions;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index d72f3ccfcd6..296b8155e99 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableQueryType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,7 +32,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -42,11 +41,7 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
@@ -73,16 +68,12 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private final long start;
   // Length of bytes to read from the base file
   private final long length;
-  // Key to record and metadata mapping from log files
-  private final Map<String, Pair<Option<T>, Map<String, Object>>> 
logFileRecordMapping = new HashMap<>();
-  private final FileGroupReaderState readerState = new FileGroupReaderState();
+  // Core structure to store and process records.
+  private final HoodieFileGroupRecordBuffer<T> recordBuffer;
+  private final HoodieFileGroupReaderState readerState = new 
HoodieFileGroupReaderState();
   private ClosableIterator<T> baseFileIterator;
-  // This is only initialized and used after all records from the base file 
are iterated
-  private Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
   private HoodieRecordMerger recordMerger;
 
-  T nextRecord;
-
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                HoodieTableMetaClient metaClient,
                                String fileGroupId,
@@ -90,7 +81,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                HoodieTimeline timeline,
                                HoodieTableQueryType queryType,
                                Option<String> instantTime,
-                               Option<String> startInstantTime) {
+                               Option<String> startInstantTime,
+                               HoodieFileGroupRecordBuffer<T> recordBuffer) {
     // This constructor is a placeholder now to allow automatically fetching 
the correct list of
     // base and log files for a file group.
     // Derive base and log files and call the corresponding constructor.
@@ -102,6 +94,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.start = 0;
     this.length = Long.MAX_VALUE;
     this.baseFileIterator = new EmptyIterator<>();
+    this.recordBuffer = recordBuffer;
   }
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
@@ -113,7 +106,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                Schema avroSchema,
                                TypedProperties props,
                                long start,
-                               long length) {
+                               long length,
+                               HoodieFileGroupRecordBuffer<T> recordBuffer) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
     this.baseFilePath = baseFilePath;
@@ -128,6 +122,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.readerState.baseFileAvroSchema = avroSchema;
     this.readerState.logRecordAvroSchema = avroSchema;
     this.readerState.mergeProps.putAll(props);
+    this.recordBuffer = recordBuffer;
   }
 
   /**
@@ -136,10 +131,10 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   public void initRecordIterators() {
     this.baseFileIterator = baseFilePath.isPresent()
         ? readerContext.getFileRecordIterator(
-        baseFilePath.get().getHadoopPath(), 0, baseFilePath.get().getFileLen(),
-        readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, 
hadoopConf)
+            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
         : new EmptyIterator<>();
     scanLogFiles();
+    recordBuffer.setBaseFileIteraotr(baseFileIterator);
   }
 
   /**
@@ -147,49 +142,22 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
    * @throws IOException on reader error.
    */
   public boolean hasNext() throws IOException {
-    while (baseFileIterator.hasNext()) {
-      T baseRecord = baseFileIterator.next();
-      String recordKey = readerContext.getRecordKey(baseRecord, 
readerState.baseFileAvroSchema);
-      Pair<Option<T>, Map<String, Object>> logRecordInfo = 
logFileRecordMapping.remove(recordKey);
-      Option<T> resultRecord = logRecordInfo != null
-          ? merge(Option.of(baseRecord), Collections.emptyMap(), 
logRecordInfo.getLeft(), logRecordInfo.getRight())
-          : merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), Collections.emptyMap());
-      if (resultRecord.isPresent()) {
-        nextRecord = readerContext.seal(resultRecord.get());
-        return true;
-      }
-    }
-
-    if (logRecordIterator == null) {
-      logRecordIterator = logFileRecordMapping.values().iterator();
-    }
-
-    while (logRecordIterator.hasNext()) {
-      Pair<Option<T>, Map<String, Object>> nextRecordInfo = 
logRecordIterator.next();
-      Option<T> resultRecord = merge(Option.empty(), Collections.emptyMap(),
-          nextRecordInfo.getLeft(), nextRecordInfo.getRight());
-      if (resultRecord.isPresent()) {
-        nextRecord = readerContext.seal(resultRecord.get());
-        return true;
-      }
-    }
-
-    return false;
+    return recordBuffer.hasNext();
   }
 
   /**
    * @return The next record after calling {@link #hasNext}.
    */
   public T next() {
-    T result = nextRecord;
-    nextRecord = null;
-    return result;
+    return recordBuffer.next();
   }
 
   private void scanLogFiles() {
     if (logFilePathList.isPresent()) {
-      FileSystem fs = readerContext.getFs(logFilePathList.get().get(0), 
hadoopConf);
-      HoodieMergedLogRecordReader<T> logRecordReader = 
HoodieMergedLogRecordReader.newBuilder()
+      String path = baseFilePath.isPresent() ? baseFilePath.get().getPath() : 
logFilePathList.get().get(0);
+      FileSystem fs = readerContext.getFs(path, hadoopConf);
+
+      HoodieMergedLogRecordReader logRecordReader = 
HoodieMergedLogRecordReader.newBuilder()
           .withHoodieReaderContext(readerContext)
           .withFileSystem(fs)
           .withBasePath(readerState.tablePath)
@@ -200,34 +168,56 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
           .withReverseReader(false)
           .withBufferSize(getIntWithAltKeys(props, 
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
           .withPartition(getRelativePartitionPath(
-              new Path(readerState.tablePath), new 
Path(logFilePathList.get().get(0)).getParent()
-          ))
+              new Path(readerState.tablePath), new 
Path(logFilePathList.get().get(0)).getParent()))
           .withRecordMerger(recordMerger)
+          .withRecordBuffer(recordBuffer)
           .build();
-      logFileRecordMapping.putAll(logRecordReader.getRecords());
       logRecordReader.close();
     }
   }
 
-  private Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
-                          Option<T> newer, Map<String, Object> newerInfoMap) 
throws IOException {
-    if (!older.isPresent()) {
-      return newer;
-    }
-
-    Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
-        readerContext.constructHoodieRecord(older, olderInfoMap, 
readerState.baseFileAvroSchema), readerState.baseFileAvroSchema,
-        readerContext.constructHoodieRecord(newer, newerInfoMap, 
readerState.logRecordAvroSchema), readerState.logRecordAvroSchema, props);
-    if (mergedRecord.isPresent()) {
-      return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
-    }
-    return Option.empty();
-  }
-
   @Override
   public void close() throws IOException {
     if (baseFileIterator != null) {
       baseFileIterator.close();
     }
+    if (recordBuffer != null) {
+      recordBuffer.close();
+    }
+  }
+
+  public HoodieFileGroupReaderIterator<T> getClosableIterator() {
+    return new HoodieFileGroupReaderIterator<>(this);
+  }
+
+  public static class HoodieFileGroupReaderIterator<T> implements 
ClosableIterator<T> {
+    private final HoodieFileGroupReader<T> reader;
+
+    public HoodieFileGroupReaderIterator(HoodieFileGroupReader<T> reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return reader.hasNext();
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to read record", e);
+      }
+    }
+
+    @Override
+    public T next() {
+      return reader.next();
+    }
+
+    @Override
+    public void close() {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to close the reader", e);
+      }
+    }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderState.java
similarity index 96%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderState.java
index 2caa146fc92..e50713bb40a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderState.java
@@ -27,7 +27,7 @@ import org.apache.avro.Schema;
  * A class holding the state that is needed by {@code HoodieFileGroupReader},
  * e.g., schema, merging strategy, etc.
  */
-public class FileGroupReaderState {
+public class HoodieFileGroupReaderState {
   public String tablePath;
   public String latestCommitTime;
   public Schema baseFileAvroSchema;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..680bbf9d705
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+public interface HoodieFileGroupRecordBuffer<T> {
+  enum BufferType {
+    KEY_BASED,
+    POSITION_BASED
+  }
+
+  /**
+   * @return The merge strategy implemented.
+   */
+  BufferType getBufferType();
+
+  /**
+   * Process a log data block, and store the resulting records into the buffer.
+   *
+   * @param dataBlock
+   * @param keySpecOpt
+   * @throws IOException
+   */
+  void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) 
throws IOException;
+
+  /**
+   * Process a next record in a log data block.
+   *
+   * @param record
+   * @param metadata
+   * @throws Exception
+   */
+  void processNextDataRecord(T record, Map<String, Object> metadata, Object 
index) throws IOException;
+
+  /**
+   * Process a log delete block, and store the resulting records into the 
buffer.
+   *
+   * @param deleteBlock
+   * @throws IOException
+   */
+  void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws IOException;
+
+  /**
+   * Process next delete record.
+   *
+   * @param deleteRecord
+   */
+  void processNextDeletedRecord(DeleteRecord deleteRecord, Object index);
+
+  /**
+   * Check if a record exists in the buffered records.
+   */
+  boolean containsLogRecord(String recordKey);
+
+  /**
+   * @return the number of log records in the buffer.
+   */
+  int size();
+
+  /**
+   * @return An iterator on the log records.
+   */
+  Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator();
+
+  /**
+   * @return The underlying data stored in the buffer.
+   */
+  Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords();
+
+  /**
+   * Link the base file iterator for consequential merge.
+   *
+   * @param baseFileIterator
+   */
+  void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator);
+
+  /**
+   * Check if next merged record exists.
+   *
+   * @return true if it has, otherwise false.
+   */
+  boolean hasNext() throws IOException;
+
+  /**
+   *
+   * @return output the next merged record.
+   */
+  T next();
+
+  void close();
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..d055246cfa8
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
+ * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} 
methods into a record key based map.
+ * The records from the base file is accessed from an iterator object. These 
records are merged when the
+ * {@link #hasNext} method is called.
+ */
+public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
+  public HoodieKeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
+                                             Schema readerSchema,
+                                             Schema baseFileSchema,
+                                             Option<String> 
partitionNameOverrideOpt,
+                                             Option<String[]> 
partitionPathFieldOpt,
+                                             HoodieRecordMerger recordMerger,
+                                             TypedProperties payloadProps,
+                                             HoodieTableMetaClient 
hoodieTableMetaClient) {
+    super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt,
+        recordMerger, payloadProps, hoodieTableMetaClient);
+  }
+
+  @Override
+  public BufferType getBufferType() {
+    return BufferType.KEY_BASED;
+  }
+
+  @Override
+  public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws IOException {
+    checkState(partitionNameOverrideOpt.isPresent() || 
partitionPathFieldOpt.isPresent(),
+        "Either partition-name override or partition-path field had to be 
present");
+
+
+    Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
+        getRecordsIterator(dataBlock, keySpecOpt);
+
+    try (ClosableIterator<T> recordIterator = 
recordsIteratorSchemaPair.getLeft()) {
+      while (recordIterator.hasNext()) {
+        T nextRecord = recordIterator.next();
+        Map<String, Object> metadata = 
readerContext.generateMetadataForRecord(nextRecord, readerSchema);
+        String recordKey = (String) 
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
+        processNextDataRecord(nextRecord, metadata, recordKey);
+      }
+    }
+  }
+
+  @Override
+  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Object recordKey) throws IOException {
+    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
+    Option<T> mergedRecord = doProcessNextDataRecord(record, metadata, 
existingRecordMetadataPair);
+    if (mergedRecord.isPresent()) {
+      records.put(recordKey, 
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+    }
+  }
+
+  @Override
+  public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws 
IOException {
+    Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+    while (it.hasNext()) {
+      DeleteRecord record = it.next();
+      String recordKey = record.getRecordKey();
+      processNextDeletedRecord(record, recordKey);
+    }
+  }
+
+  @Override
+  public void processNextDeletedRecord(DeleteRecord deleteRecord, Object 
recordKey) {
+    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
+    Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecordMetadataPair);
+    if (recordOpt.isPresent()) {
+      records.put(recordKey, Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(
+          (String) recordKey, recordOpt.get().getPartitionPath(), 
recordOpt.get().getOrderingValue())));
+    }
+  }
+
+  @Override
+  public boolean containsLogRecord(String recordKey) {
+    return records.containsKey(recordKey);
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    ValidationUtils.checkState(baseFileIterator != null, "Base file iterator 
has not been set yet");
+
+    // Handle merging.
+    while (baseFileIterator.hasNext()) {
+      T baseRecord = baseFileIterator.next();
+
+      String recordKey = readerContext.getRecordKey(baseRecord, 
baseFileSchema);
+      Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(recordKey);
+
+      Option<T> resultRecord = logRecordInfo != null
+          ? merge(Option.of(baseRecord), Collections.emptyMap(), 
logRecordInfo.getLeft(), logRecordInfo.getRight())
+          : merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), Collections.emptyMap());
+      if (resultRecord.isPresent()) {
+        nextRecord = readerContext.seal(resultRecord.get());
+        return true;
+      }
+    }
+
+    // Handle records solely from log files.
+    if (logRecordIterator == null) {
+      logRecordIterator = records.values().iterator();
+    }
+
+    while (logRecordIterator.hasNext()) {
+      Pair<Option<T>, Map<String, Object>> nextRecordInfo = 
logRecordIterator.next();
+      Option<T> resultRecord;
+      resultRecord = merge(Option.empty(), Collections.emptyMap(),
+          nextRecordInfo.getLeft(), nextRecordInfo.getRight());
+      if (resultRecord.isPresent()) {
+        nextRecord = readerContext.seal(resultRecord.get());
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..7fc99523e19
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
+ * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} 
methods into record position based map.
+ * Here the position means that record position in the base file. The records 
from the base file is accessed from an iterator object. These records are 
merged when the
+ * {@link #hasNext} method is called.
+ */
+public class HoodiePositionBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
+  private long nextRecordPosition = 0L;
+
+  public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
+                                                  Schema readerSchema,
+                                                  Schema baseFileSchema,
+                                                  Option<String> 
partitionNameOverrideOpt,
+                                                  Option<String[]> 
partitionPathFieldOpt,
+                                                  HoodieRecordMerger 
recordMerger,
+                                                  TypedProperties payloadProps,
+                                                  HoodieTableMetaClient 
hoodieTableMetaClient) {
+    super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt,
+        recordMerger, payloadProps, hoodieTableMetaClient);
+  }
+
+  @Override
+  public BufferType getBufferType() {
+    return BufferType.POSITION_BASED;
+  }
+
+  @Override
+  public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws IOException {
+    checkState(partitionNameOverrideOpt.isPresent() || 
partitionPathFieldOpt.isPresent(),
+        "Either partition-name override or partition-path field had to be 
present");
+
+    // Prepare key filters.
+    Set<String> keys = new HashSet<>();
+    boolean isFullKey = true;
+    if (keySpecOpt.isPresent()) {
+      if (!keySpecOpt.get().getKeys().isEmpty()) {
+        keys = new HashSet<>(keySpecOpt.get().getKeys());
+      }
+      isFullKey = keySpecOpt.get().isFullKey();
+    }
+
+    // Extract positions from data block.
+    List<Long> recordPositions = extractRecordPositions(dataBlock);
+
+    // TODO: return an iterator that can generate sequence number with the 
record.
+    //  Then we can hide this logic into data block.
+    try (ClosableIterator<T> recordIterator = 
dataBlock.getEngineRecordIterator(readerContext)) {
+      int recordIndex = 0;
+      while (recordIterator.hasNext()) {
+        T nextRecord = recordIterator.next();
+
+        // Skip a record if it is not contained in the specified keys.
+        if (shouldSkip(nextRecord, dataBlock.getKeyFieldName(), isFullKey, 
keys)) {
+          recordIndex++;
+          continue;
+        }
+
+        long recordPosition = recordPositions.get(recordIndex++);
+        processNextDataRecord(
+            nextRecord,
+            readerContext.generateMetadataForRecord(nextRecord, readerSchema),
+            recordPosition
+        );
+      }
+    }
+  }
+
+  @Override
+  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Object recordPosition) throws IOException {
+    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
+    Option<T> mergedRecord = doProcessNextDataRecord(record, metadata, 
existingRecordMetadataPair);
+    if (mergedRecord.isPresent()) {
+      records.put(recordPosition, 
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+    }
+  }
+
+  @Override
+  public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws 
IOException {
+    List<Long> recordPositions = extractRecordPositions(deleteBlock);
+
+    int recordIndex = 0;
+    Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+    while (it.hasNext()) {
+      DeleteRecord record = it.next();
+      long recordPosition = recordPositions.get(recordIndex++);
+      processNextDeletedRecord(record, recordPosition);
+    }
+  }
+
+  @Override
+  public void processNextDeletedRecord(DeleteRecord deleteRecord, Object 
recordPosition) {
+    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
+    Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecordMetadataPair);
+    if (recordOpt.isPresent()) {
+      String recordKey = recordOpt.get().getRecordKey();
+      records.put(recordPosition, Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(
+          recordKey, recordOpt.get().getPartitionPath(), 
recordOpt.get().getOrderingValue())));
+    }
+  }
+
+  @Override
+  public boolean containsLogRecord(String recordKey) {
+    return records.values().stream()
+        .filter(r -> r.getLeft().isPresent())
+        .map(r -> readerContext.getRecordKey(r.getKey().get(), 
readerSchema)).anyMatch(recordKey::equals);
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    ValidationUtils.checkState(baseFileIterator != null, "Base file iterator 
has not been set yet");
+
+    // Handle merging.
+    while (baseFileIterator.hasNext()) {
+      T baseRecord = baseFileIterator.next();
+      Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
+
+      Option<T> resultRecord = logRecordInfo != null
+          ? merge(Option.of(baseRecord), Collections.emptyMap(), 
logRecordInfo.getLeft(), logRecordInfo.getRight())
+          : merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), Collections.emptyMap());
+      if (resultRecord.isPresent()) {
+        nextRecord = readerContext.seal(resultRecord.get());
+        return true;
+      }
+    }
+
+    // Handle records solely from log files.
+    if (logRecordIterator == null) {
+      logRecordIterator = records.values().iterator();
+    }
+
+    while (logRecordIterator.hasNext()) {
+      Pair<Option<T>, Map<String, Object>> nextRecordInfo = 
logRecordIterator.next();
+      Option<T> resultRecord;
+      resultRecord = merge(Option.empty(), Collections.emptyMap(),
+          nextRecordInfo.getLeft(), nextRecordInfo.getRight());
+      if (resultRecord.isPresent()) {
+        nextRecord = readerContext.seal(resultRecord.get());
+        return true;
+      }
+    }
+
+    return false;
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 4f7c4d9595f..f3e142b2f57 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -38,6 +38,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -47,10 +48,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -64,7 +68,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
 
   public abstract String getBasePath();
 
-  public abstract HoodieReaderContext<T> getHoodieReaderContext();
+  public abstract HoodieReaderContext<T> getHoodieReaderContext(String[] 
partitionPath);
 
   public abstract void commitToTable(List<String> recordList, String operation,
                                      Map<String, String> writeConfigs);
@@ -80,12 +84,14 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
     writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
     writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
+    writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
     writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
     writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
     writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4");
     writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
     writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
     writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
+    writeConfigs.put("hoodie.compact.inline", "false");
     writeConfigs.put(HoodieMetadataConfig.ENABLE.key(), "false");
 
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
@@ -123,10 +129,22 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     List<T> actualRecordList = new ArrayList<>();
     TypedProperties props = new TypedProperties();
     props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
-    props.setProperty(HoodieTableConfig.RECORD_MERGER_STRATEGY.key(),
-        HoodieTableConfig.RECORD_MERGER_STRATEGY.defaultValue());
+    props.setProperty("hoodie.payload.ordering.field", "timestamp");
+    props.setProperty(RECORD_MERGER_STRATEGY.key(), 
RECORD_MERGER_STRATEGY.defaultValue());
+    String filePath = fileSlice.getBaseFile().isPresent() ? 
fileSlice.getBaseFile().get().getPath() : logFilePathList.get(0);
+    String[] partitionValues = {partitionPaths[0]};
+    HoodieFileGroupRecordBuffer<T> recordBuffer = new 
HoodieKeyBasedFileGroupRecordBuffer<>(
+        getHoodieReaderContext(partitionValues),
+        avroSchema,
+        avroSchema,
+        Option.of(getRelativePartitionPath(new Path(basePath), new 
Path(filePath).getParent())),
+        metaClient.getTableConfig().getPartitionFields(),
+        getHoodieReaderContext(partitionValues).getRecordMerger(
+            getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue())),
+        props,
+        metaClient);
     HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<T>(
-        getHoodieReaderContext(),
+        getHoodieReaderContext(partitionValues),
         hadoopConf,
         basePath,
         metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
@@ -134,8 +152,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         logFilePathList.isEmpty() ? Option.empty() : 
Option.of(logFilePathList),
         avroSchema,
         props,
-        -1,
-        -1);
+        0,
+        fileSlice.getTotalFileSize(),
+        recordBuffer);
     fileGroupReader.initRecordIterators();
     while (fileGroupReader.hasNext()) {
       actualRecordList.add(fileGroupReader.next());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
index 34214be1bd2..a76d4bfc77f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation._
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.ConfigProperty
+import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
 import 
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX
@@ -36,7 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.spark.sql.catalyst.analysis.Resolver
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
 NewHoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
HadoopFsRelation}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.BaseRelation
@@ -150,6 +150,12 @@ class NewHoodieParquetFileFormatUtils(val sqlContext: 
SQLContext,
       sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(config.defaultValue())))
   }
 
+  private def checkIfAConfigurationEnabled(config: 
ConfigProperty[java.lang.Boolean],
+                             defaultValueOption: Option[String] = 
Option.empty): Boolean = {
+    optParams.getOrElse(config.key(),
+      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(String.valueOf(config.defaultValue())))).toBoolean
+  }
+
   protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
     DataSourceReadOptions.REALTIME_MERGE.defaultValue)
 
@@ -180,6 +186,9 @@ class NewHoodieParquetFileFormatUtils(val sqlContext: 
SQLContext,
     val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
       Option(metaClient.getTableConfig.getRecordMergerStrategy))
 
+    val fileGroupReaderEnabled = 
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+    val shouldUseRecordPosition = 
checkIfAConfigurationEnabled(HoodieWriteConfig.WRITE_RECORD_POSITIONS)
+
     val tableState = // Subset of the state of table's configuration as of at 
the time of the query
       HoodieTableState(
         tablePath = basePath.toString,
@@ -199,14 +208,18 @@ class NewHoodieParquetFileFormatUtils(val sqlContext: 
SQLContext,
       Seq.empty
     }
     fileIndex.shouldEmbedFileSlices = true
+    val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
+      tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+      metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap, shouldUseRecordPosition)
+    val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+      sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+      metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap)
     HadoopFsRelation(
       location = fileIndex,
       partitionSchema = fileIndex.partitionSchema,
       dataSchema = fileIndex.dataSchema,
       bucketSpec = None,
-      fileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap),
+      fileFormat = if (fileGroupReaderEnabled) fileGroupReaderBasedFileFormat 
else newHoodieParquetFileFormat,
       optParams)(sparkSession)
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
new file mode 100644
index 00000000000..5bc4e4a80f9
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import kotlin.NotImplementedError
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.engine.HoodieReaderContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile, 
HoodieRecord, HoodieRecordMerger}
+import org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.read.{HoodieFileGroupReader, 
HoodieFileGroupRecordBuffer, HoodieKeyBasedFileGroupRecordBuffer, 
HoodiePositionBasedFileGroupRecordBuffer}
+import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, 
HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation, 
PartitionFileSliceMapping, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+/**
+ * This class utilizes {@link HoodieFileGroupReader} and its related classes 
to support reading
+ * from Parquet formatted base files and their log files.
+ */
+class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
+                                                  tableSchema: 
HoodieTableSchema,
+                                                  tableName: String,
+                                                  mergeType: String,
+                                                  mandatoryFields: Seq[String],
+                                                  isMOR: Boolean,
+                                                  isBootstrap: Boolean,
+                                                  shouldUseRecordPosition: 
Boolean
+                                           ) extends ParquetFileFormat with 
SparkAdapterSupport {
+  var isProjected = false
+
+  /**
+   * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
+   * while the other side can't
+   */
+  private var supportBatchCalled = false
+  private var supportBatchResult = false
+
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+    if (!supportBatchCalled) {
+      supportBatchCalled = true
+      supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+    }
+    supportBatchResult
+  }
+
+  override def isSplitable(
+                            sparkSession: SparkSession,
+                            options: Map[String, String],
+                            path: Path): Boolean = {
+    false
+  }
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+    val requiredSchemaWithMandatory = 
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema)
+    val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f 
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
+    val requiredMeta = StructType(requiredSchemaSplits._1)
+    val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
+    val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
+    val (baseFileReader, preMergeBaseFileReader, _, _) = buildFileReaders(
+      sparkSession, dataSchema, partitionSchema, requiredSchema, filters, 
options, augmentedHadoopConf,
+      requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
+    val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(augmentedHadoopConf))
+
+    (file: PartitionedFile) => {
+      file.partitionValues match {
+        case fileSliceMapping: PartitionFileSliceMapping =>
+          val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+          if (FSUtils.isLogFile(filePath)) {
+            // TODO: Use FileGroupReader here: HUDI-6942.
+            throw new NotImplementedError("Not support reading with only log 
files")
+          } else {
+            fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) 
match {
+              case Some(fileSlice) =>
+                val hoodieBaseFile = fileSlice.getBaseFile.get()
+                val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
+                val partitionValues = fileSliceMapping.getInternalRow
+                val logFiles = getLogFilesFromSlice(fileSlice)
+                if (requiredSchemaWithMandatory.isEmpty) {
+                  val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+                  // TODO: Use FileGroupReader here: HUDI-6942.
+                  baseFileReader(baseFile)
+                } else if (bootstrapFileOpt.isPresent) {
+                  // TODO: Use FileGroupReader here: HUDI-6942.
+                  throw new NotImplementedError("Not support reading bootstrap 
file")
+                } else {
+                  if (logFiles.isEmpty) {
+                    throw new IllegalStateException(
+                      "should not be here since file slice should not have 
been broadcasted "
+                        + "since it has no log or data files")
+                  }
+                  buildFileGroupIterator(
+                    preMergeBaseFileReader,
+                    partitionValues,
+                    hoodieBaseFile,
+                    logFiles,
+                    requiredSchemaWithMandatory,
+                    broadcastedHadoopConf.value.value,
+                    file.start,
+                    file.length,
+                    shouldUseRecordPosition
+                  )
+                }
+              // TODO: Use FileGroupReader here: HUDI-6942.
+              case _ => baseFileReader(file)
+            }
+          }
+        // TODO: Use FileGroupReader here: HUDI-6942.
+        case _ => baseFileReader(file)
+      }
+    }
+  }
+
+  protected def buildFileGroupIterator(preMergeBaseFileReader: PartitionedFile 
=> Iterator[InternalRow],
+                                       partitionValues: InternalRow,
+                                       baseFile: HoodieBaseFile,
+                                       logFiles: List[HoodieLogFile],
+                                       requiredSchemaWithMandatory: StructType,
+                                       hadoopConf: Configuration,
+                                       start: Long,
+                                       length: Long,
+                                       shouldUseRecordPosition: Boolean): 
Iterator[InternalRow] = {
+    val readerContext: HoodieReaderContext[InternalRow] = new 
SparkFileFormatInternalRowReaderContext(
+      preMergeBaseFileReader, partitionValues)
+    val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+      .builder().setConf(hadoopConf).setBasePath(tableState.tablePath).build
+    val avroSchema: Schema = 
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
+    val filePath: String = if (baseFile != null) baseFile.getPath else 
logFiles.head.toString
+    val partitionNameOpt: HOption[String] = 
HOption.of(FSUtils.getRelativePartitionPath(
+      new Path(tableState.tablePath), new Path(filePath).getParent))
+    val partitionPathFieldOpt = metaClient.getTableConfig.getPartitionFields
+    val recordMerger: HoodieRecordMerger = 
readerContext.getRecordMerger(getStringWithAltKeys(
+      new TypedProperties, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue))
+    val recordBuffer: HoodieFileGroupRecordBuffer[InternalRow] = if 
(shouldUseRecordPosition) {
+      new HoodiePositionBasedFileGroupRecordBuffer[InternalRow](
+        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        recordMerger, new TypedProperties, metaClient)
+    } else {
+      new HoodieKeyBasedFileGroupRecordBuffer[InternalRow](
+        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        recordMerger, new TypedProperties, metaClient)
+    }
+    val reader = new HoodieFileGroupReader[InternalRow](
+      readerContext,
+      hadoopConf,
+      tableState.tablePath,
+      tableState.latestCommitTimestamp.get,
+      HOption.of(baseFile),
+      HOption.of(logFiles.map(f => f.getPath.toString).asJava),
+      HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, 
tableName),
+      new TypedProperties(),
+      start,
+      length,
+      recordBuffer
+    )
+    reader.initRecordIterators()
+    
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
+  }
+
+  def generateRequiredSchemaWithMandatory(requiredSchema: StructType, 
dataSchema: StructType): StructType = {
+    if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
+      val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+      for (field <- mandatoryFields) {
+        if (requiredSchema.getFieldIndex(field).isEmpty) {
+          val fieldToAdd = 
dataSchema.fields(dataSchema.getFieldIndex(field).get)
+          added.append(fieldToAdd)
+        }
+      }
+      val addedFields = StructType(added.toArray)
+      StructType(requiredSchema.toArray ++ addedFields.fields)
+    } else {
+      dataSchema
+    }
+  }
+
+  protected def buildFileReaders(sparkSession: SparkSession, dataSchema: 
StructType, partitionSchema: StructType,
+                                 requiredSchema: StructType, filters: 
Seq[Filter], options: Map[String, String],
+                                 hadoopConf: Configuration, 
requiredSchemaWithMandatory: StructType,
+                                 requiredWithoutMeta: StructType, 
requiredMeta: StructType):
+  (PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow]) = {
+
+    //file reader when you just read a hudi parquet file and don't do any 
merging
+    val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
+      filters, options, new Configuration(hadoopConf))
+
+    //file reader for reading a hudi base file that needs to be merged with 
log files
+    val preMergeBaseFileReader = if (isMOR) {
+      // Add support for reading files using inline file system.
+      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema,
+        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    //Rules for appending partitions and filtering in the bootstrap readers:
+    // 1. if it is mor, we don't want to filter data or append partitions
+    // 2. if we need to merge the bootstrap base and skeleton files then we 
cannot filter
+    // 3. if we need to merge the bootstrap base and skeleton files then we 
should never append partitions to the
+    //    skeleton reader
+
+    val needMetaCols = requiredMeta.nonEmpty
+    val needDataCols = requiredWithoutMeta.nonEmpty
+
+    //file reader for bootstrap skeleton files
+    val skeletonReader = if (needMetaCols && isBootstrap) {
+      if (needDataCols || isMOR) {
+        // no filter and no append
+        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+          requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
+      } else {
+        // filter and append
+        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, partitionSchema,
+          requiredMeta, filters, options, new Configuration(hadoopConf))
+      }
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    //file reader for bootstrap base files
+    val bootstrapBaseReader = if (needDataCols && isBootstrap) {
+      val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => 
isMetaField(sf.name)))
+      if (isMOR) {
+        // no filter and no append
+        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+          Seq.empty, options, new Configuration(hadoopConf))
+      } else if (needMetaCols) {
+        // no filter but append
+        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+          Seq.empty, options, new Configuration(hadoopConf))
+      } else {
+        // filter and append
+        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+          filters, options, new Configuration(hadoopConf))
+      }
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader)
+  }
+
+  protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
+    
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
index 36f0f3ca351..57243e97125 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -55,11 +55,18 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT;
+import static 
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY;
+import static org.apache.hudi.config.HoodieWriteConfig.RECORD_MERGER_IMPLS;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_RECORD_POSITIONS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,8 +86,8 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
         HoodieTableConfig.BASE_FILE_FORMAT.key(),
         HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
     properties.setProperty(
-        "hoodie.payload.ordering.field",
-        "_hoodie_record_key");
+        PAYLOAD_ORDERING_FIELD_PROP_KEY,
+        
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
     metaClient = getHoodieMetaClient(hadoopConf(), basePath(), 
HoodieTableType.MERGE_ON_READ, properties);
   }
 
@@ -89,6 +96,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
     HoodieWriteConfig writeConfig = buildDefaultWriteConfig(SCHEMA);
     HoodieRecordMerger merger = writeConfig.getRecordMerger();
     assertTrue(merger instanceof DefaultMerger);
+    
assertTrue(writeConfig.getBooleanOrDefault(FILE_GROUP_READER_ENABLED.key(), 
false));
     insertAndUpdate(writeConfig, 114);
   }
 
@@ -97,6 +105,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
     HoodieWriteConfig writeConfig = buildNoFlushWriteConfig(SCHEMA);
     HoodieRecordMerger merger = writeConfig.getRecordMerger();
     assertTrue(merger instanceof NoFlushMerger);
+    
assertTrue(writeConfig.getBooleanOrDefault(FILE_GROUP_READER_ENABLED.key(), 
false));
     insertAndUpdate(writeConfig, 64);
   }
 
@@ -105,6 +114,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
     HoodieWriteConfig writeConfig = buildCustomWriteConfig(SCHEMA);
     HoodieRecordMerger merger = writeConfig.getRecordMerger();
     assertTrue(merger instanceof CustomMerger);
+    
assertTrue(writeConfig.getBooleanOrDefault(FILE_GROUP_READER_ENABLED.key(), 
false));
     insertAndUpdate(writeConfig, 95);
   }
 
@@ -149,14 +159,20 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
   public HoodieWriteConfig getWriteConfig(Schema avroSchema) {
     Properties extraProperties = new Properties();
     extraProperties.setProperty(
-        "hoodie.datasource.write.record.merger.impls",
+        RECORD_MERGER_IMPLS.key(),
         "org.apache.hudi.HoodieSparkRecordMerger");
     extraProperties.setProperty(
-        "hoodie.logfile.data.block.format",
+        LOGFILE_DATA_BLOCK_FORMAT.key(),
         "parquet");
     extraProperties.setProperty(
-        "hoodie.payload.ordering.field",
-        "_hoodie_record_key");
+        PAYLOAD_ORDERING_FIELD_PROP_KEY,
+        
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+    extraProperties.setProperty(
+        FILE_GROUP_READER_ENABLED.key(),
+        "true");
+    extraProperties.setProperty(
+        WRITE_RECORD_POSITIONS.key(),
+        "true");
 
     return getConfigBuilder(true)
         .withPath(basePath())
@@ -210,11 +226,32 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
   }
 
   public void checkDataEquality(int numRecords) {
-    List<Row> rows = spark()
+    Map<String, String> properties = new HashMap<>();
+    properties.put(
+        RECORD_MERGER_IMPLS.key(),
+        "org.apache.hudi.HoodieSparkRecordMerger");
+    properties.put(
+        LOGFILE_DATA_BLOCK_FORMAT.key(),
+        "parquet");
+    properties.put(
+        PAYLOAD_ORDERING_FIELD_PROP_KEY,
+        
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+    properties.put(
+        FILE_GROUP_READER_ENABLED.key(),
+        "true");
+    properties.put(
+        DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
+        "true");
+    properties.put(
+        WRITE_RECORD_POSITIONS.key(),
+        "true");
+    Dataset<Row> rows = spark()
         .read()
+        .options(properties)
         .format("org.apache.hudi")
-        .load(basePath() + "/" + getPartitionPath()).collectAsList();
-    assertEquals(numRecords, rows.size());
+        .load(basePath() + "/" + getPartitionPath());
+    List<Row> result = rows.collectAsList();
+    assertEquals(numRecords, result.size());
   }
 
   public void insertAndUpdate(HoodieWriteConfig writeConfig, int 
expectedRecordNum) throws Exception {
@@ -276,6 +313,15 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
 
       // Check data after
       checkDataEquality(expectedRecordNum);
+
+      // (3) Write: append, generate the log file.
+      instantTime = "003";
+      writeClient.startCommitWithTime(instantTime);
+
+      List<HoodieRecord> records5 = 
generateEmptyRecords(getKeys(records).subList(50, 59)); // 9 deletes only
+      assertEquals(9, records5.size());
+      updateRecordsInMORTable(reloadedMetaClient, records5, writeClient, 
writeConfig, instantTime, false);
+      checkDataEquality(expectedRecordNum - 9);
     }
   }
 
@@ -335,11 +381,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
   public static class CustomMerger extends HoodieSparkRecordMerger {
     @Override
     public boolean shouldFlush(HoodieRecord record, Schema schema, 
TypedProperties props) throws IOException {
-      if (((HoodieSparkRecord)record).getData().getString(0).equals("001")) {
-        return false;
-      } else {
-        return true;
-      }
+      return !((HoodieSparkRecord) 
record).getData().getString(0).equals("001");
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 70c1bcb480e..50056954a12 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -22,11 +22,17 @@ package org.apache.hudi.common.table.read
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.common.engine.HoodieReaderContext
+import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.{SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.{AvroConversionUtils, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, 
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.BeforeEach
@@ -55,22 +61,39 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
     sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
+    sparkConf.set("spark.sql.parquet.enableVectorizedReader", "false")
     HoodieSparkKryoRegistrar.register(sparkConf)
     spark = SparkSession.builder.config(sparkConf).getOrCreate
   }
 
   override def getHadoopConf: Configuration = {
-    new Configuration()
+    FSUtils.buildInlineConf(new Configuration)
   }
 
   override def getBasePath: String = {
     tempDir.toAbsolutePath.toUri.toString
   }
 
-  override def getHoodieReaderContext: HoodieReaderContext[InternalRow] = {
-    new SparkFileFormatInternalRowReaderContext(spark,
-      
SparkAdapterSupport.sparkAdapter.createLegacyHoodieParquetFileFormat(false).get,
-      getHadoopConf)
+  override def getHoodieReaderContext(partitionValues: Array[String]): 
HoodieReaderContext[InternalRow] = {
+    val parquetFileFormat = new ParquetFileFormat
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(getBasePath).build
+    val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+    val structTypeSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
+    val partitionFields = metaClient.getTableConfig.getPartitionFields
+    val partitionSchema = new StructType(structTypeSchema.fields.filter(f => 
partitionFields.get().contains(f.name)))
+
+    val recordReaderIterator = 
parquetFileFormat.buildReaderWithPartitionValues(
+    spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty, 
Map.empty, getHadoopConf)
+    val numPartitionFields = if (partitionFields.isPresent) 
partitionFields.get().length else 0
+    assertEquals(numPartitionFields, partitionValues.length)
+
+    val partitionValuesEncoded = new Array[UTF8String](partitionValues.length)
+    for (i <- Range(0, numPartitionFields)) {
+      partitionValuesEncoded.update(i, 
UTF8String.fromString(partitionValues.apply(i)))
+    }
+
+    val partitionValueRow = new 
GenericInternalRow(partitionValuesEncoded.toArray[Any])
+    new SparkFileFormatInternalRowReaderContext(recordReaderIterator, 
partitionValueRow)
   }
 
   override def commitToTable(recordList: util.List[String], operation: String, 
options: util.Map[String, String]): Unit = {

Reply via email to