This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0517519ec08 [HUDI-8289] Unmerged log scanner deprecation (#13383)
0517519ec08 is described below

commit 0517519ec08abc71e15e7be34daa4bd1c240304d
Author: Tim Brown <[email protected]>
AuthorDate: Thu Jun 5 20:11:13 2025 -0500

    [HUDI-8289] Unmerged log scanner deprecation (#13383)
---
 azure-pipelines-20230430.yml                       |  16 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   2 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  39 ++--
 .../HoodieLogCompactionPlanGenerator.java          |  41 ++--
 .../FlinkHoodieBackedTableMetadataWriter.java      |   2 +-
 .../JavaHoodieBackedTableMetadataWriter.java       |   2 +-
 .../hudi/client/model/HoodieInternalRow.java       |   2 +-
 .../client/utils/SparkMetadataWriterUtils.java     | 132 +++++-------
 .../SparkHoodieBackedTableMetadataWriter.java      |  10 +-
 ...ieBackedTableMetadataWriterTableVersionSix.java |   3 +-
 .../hudi/BaseSparkInternalRowReaderContext.java    |   6 +-
 .../table/log/BaseHoodieLogRecordReader.java       |  19 +-
 .../table/log/HoodieLogBlockMetadataScanner.java   |  48 +++++
 .../table/log/HoodieMergedLogRecordReader.java     |  13 +-
 .../common/table/read/HoodieFileGroupReader.java   |   3 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  96 ++++-----
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |   5 +-
 .../realtime/HoodieMergeOnReadSnapshotReader.java  | 225 ---------------------
 .../TestHoodieMergeOnReadSnapshotReader.java       | 187 -----------------
 .../java/org/apache/hudi/TestDataSourceUtils.java  |  64 ------
 .../hudi/testutils/LogFileColStatsTestUtil.java    |  39 +---
 .../java/org/apache/hudi/TestDataSourceUtils.java  |  35 +++-
 .../TestHoodieClientOnMergeOnReadStorage.java      |  40 ++--
 .../TestMetadataUtilRLIandSIRecordGeneration.java  |  46 ++++-
 .../utilities/HoodieMetadataTableValidator.java    |   2 +-
 25 files changed, 322 insertions(+), 755 deletions(-)

diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 0d5cbe7a4a5..586c49425f8 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -128,7 +128,7 @@ stages:
     jobs:
       - job: UT_FT_1
         displayName: UT client/spark-client
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Maven@4
             displayName: maven install
@@ -242,7 +242,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_4
         displayName: UT spark-datasource Java Test 2
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Maven@4
             displayName: maven install
@@ -277,7 +277,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_5
         displayName: UT spark-datasource DML
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Maven@4
             displayName: maven install
@@ -312,7 +312,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_6
         displayName: UT spark-datasource DDL & Others
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Maven@4
             displayName: maven install
@@ -347,7 +347,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_7
         displayName: UT Hudi Streamer & FT utilities
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Docker@2
             displayName: "login to docker hub"
@@ -396,7 +396,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_8
         displayName: UT hudi-hadoop-common & Hudi Utilities others
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Docker@2
             displayName: "login to docker hub"
@@ -445,7 +445,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_9
         displayName: FT spark 2
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Maven@4
             displayName: maven install
@@ -490,7 +490,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_10
         displayName: UT FT common & other modules
-        timeoutInMinutes: '75'
+        timeoutInMinutes: '90'
         steps:
           - task: Docker@2
             displayName: "login to docker hub"
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8a75f1056fc..764843b7877 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -432,7 +432,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
           .map(fieldName -> 
HoodieAvroUtils.getSchemaForField(writeSchemaWithMetaFields, 
fieldName)).collect(Collectors.toList());
       try {
         Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
-            collectColumnRangeMetadata(recordList, fieldsToIndex, 
stat.getPath(), writeSchemaWithMetaFields, storage.getConf());
+            collectColumnRangeMetadata(recordList.iterator(), fieldsToIndex, 
stat.getPath(), writeSchemaWithMetaFields, storage.getConf());
         stat.putRecordsStats(columnRangeMetadataMap);
       } catch (HoodieException e) {
         throw new HoodieAppendException("Failed to extract append result", e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 31c164818d8..f5fd9055145 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -435,6 +435,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
       String partitionName;
       Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
       List<String> columnsToIndex = new ArrayList<>();
+      Lazy<Option<Schema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
       try {
         switch (partitionType) {
           case FILES:
@@ -446,7 +447,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
             partitionName = BLOOM_FILTERS.getPartitionPath();
             break;
           case COLUMN_STATS:
-            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
colStatsColumnsAndRecord = 
initializeColumnStatsPartition(partitionIdToAllFilesMap);
+            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
colStatsColumnsAndRecord = 
initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema);
             columnsToIndex = colStatsColumnsAndRecord.getKey();
             fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
             partitionName = COLUMN_STATS.getPartitionPath();
@@ -464,7 +465,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
               continue;
             }
             partitionName = expressionIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeExpressionIndexPartition(partitionName, dataTableInstantTime, 
lazyLatestMergedPartitionFileSliceList);
+            fileGroupCountAndRecordsPair = 
initializeExpressionIndexPartition(partitionName, dataTableInstantTime, 
lazyLatestMergedPartitionFileSliceList, tableSchema);
             break;
           case PARTITION_STATS:
             // For PARTITION_STATS, COLUMN_STATS should also be enabled
@@ -473,7 +474,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
                   
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
               continue;
             }
-            fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList);
+            fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, 
tableSchema);
             partitionName = PARTITION_STATS.getPartitionPath();
             break;
           case SECONDARY_INDEX:
@@ -558,21 +559,22 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
   }
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializePartitionStatsIndex(
-      Lazy<List<Pair<String, FileSlice>>> 
lazyLatestMergedPartitionFileSliceList) {
+      Lazy<List<Pair<String, FileSlice>>> 
lazyLatestMergedPartitionFileSliceList,
+      Lazy<Option<Schema>> tableSchemaOpt) {
     HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
         engineContext, lazyLatestMergedPartitionFileSliceList.get(), 
dataWriteConfig.getMetadataConfig(),
-        dataMetaClient, Option.empty(), 
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
+        dataMetaClient, tableSchemaOpt, 
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
     return Pair.of(fileGroupCount, records);
   }
 
-  private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionIdToAllFilesMap) {
+  private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionIdToAllFilesMap,
+                                                                               
                      Lazy<Option<Schema>> tableSchema) {
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
     if (partitionIdToAllFilesMap.isEmpty()) {
       return Pair.of(Collections.emptyList(), Pair.of(fileGroupCount, 
engineContext.emptyHoodieData()));
     }
     // Find the columns to index
-    Lazy<Option<Schema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
     final List<String> columnsToIndex = new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
         dataWriteConfig.getMetadataConfig(), tableSchema, true,
         
Option.of(dataWriteConfig.getRecordMerger().getRecordType())).keySet());
@@ -610,6 +612,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
    * @param indexDefinition                 Hoodie Index Definition for the 
expression index for which records need to be generated
    * @param metaClient                      Hoodie Table Meta Client
    * @param parallelism                     Parallelism to use for engine 
operations
+   * @param tableSchema                     Schema of the table
    * @param readerSchema                    Schema of reader
    * @param storageConf                     Storage Config
    * @param instantTime                     Instant time
@@ -618,14 +621,15 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
   protected abstract HoodieData<HoodieRecord> 
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet,
                                                                         
HoodieIndexDefinition indexDefinition,
                                                                         
HoodieTableMetaClient metaClient,
-                                                                        int 
parallelism, Schema readerSchema,
+                                                                        int 
parallelism, Schema tableSchema, Schema readerSchema,
                                                                         
StorageConfiguration<?> storageConf,
                                                                         String 
instantTime);
 
   protected abstract EngineType getEngineType();
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeExpressionIndexPartition(
-      String indexName, String dataTableInstantTime, Lazy<List<Pair<String, 
FileSlice>>> lazyLatestMergedPartitionFileSliceList) throws Exception {
+      String indexName, String dataTableInstantTime, Lazy<List<Pair<String, 
FileSlice>>> lazyLatestMergedPartitionFileSliceList,
+      Lazy<Option<Schema>> tableSchemaOpt) {
     HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName);
     ValidationUtils.checkState(indexDefinition != null, "Expression Index 
definition is not present for index " + indexName);
     List<Pair<String, FileSlice>> partitionFileSlicePairs = 
lazyLatestMergedPartitionFileSliceList.get();
@@ -634,19 +638,18 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
       if (entry.getValue().getBaseFile().isPresent()) {
         partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), 
Pair.of(entry.getValue().getBaseFile().get().getPath(), 
entry.getValue().getBaseFile().get().getFileLen())));
       }
-      entry.getValue().getLogFiles().forEach(hoodieLogFile -> {
-        if (entry.getValue().getLogFiles().count() > 0) {
-          entry.getValue().getLogFiles().forEach(logfile -> {
-            partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), 
Pair.of(logfile.getPath().toString(), logfile.getFileSize())));
-          });
-        }
-      });
+      entry.getValue().getLogFiles()
+          .forEach(hoodieLogFile -> 
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), 
Pair.of(hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))));
     });
 
     int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    if (partitionFileSlicePairs.isEmpty()) {
+      return Pair.of(fileGroupCount, engineContext.emptyHoodieData());
+    }
     int parallelism = Math.min(partitionFilePathSizeTriplet.size(), 
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
-    Schema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
-    return Pair.of(fileGroupCount, 
getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf, dataTableInstantTime));
+    Schema tableSchema = tableSchemaOpt.get().orElseThrow(() -> new 
HoodieMetadataException("Table schema is not available for expression index 
initialization"));
+    Schema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient, 
tableSchema);
+    return Pair.of(fileGroupCount, 
getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, 
dataMetaClient, parallelism, tableSchema, readerSchema, storageConf, 
dataTableInstantTime));
   }
 
   HoodieIndexDefinition getIndexDefinition(String indexName) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 5d9337ac4e7..805d88fc840 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieLogBlockMetadataScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
@@ -78,8 +78,7 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
   @Override
   protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime,
                                     Set<HoodieFileGroupId> 
pendingFileGroupIds, Option<InstantRange> instantRange) {
-    return isFileSliceEligibleForLogCompaction(fileSlice, 
lastCompletedInstantTime, instantRange)
-        && super.filterFileSlice(fileSlice, lastCompletedInstantTime, 
pendingFileGroupIds, instantRange);
+    return super.filterFileSlice(fileSlice, lastCompletedInstantTime, 
pendingFileGroupIds, instantRange) && 
isFileSliceEligibleForLogCompaction(fileSlice, lastCompletedInstantTime, 
instantRange);
   }
 
   @Override
@@ -88,33 +87,29 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
   }
 
   /**
-   * Can schedule logcompaction if log files count is greater than 4 or total 
log blocks is greater than 4.
+   * Can schedule logcompaction if log files count or total log blocks is 
greater than the configured threshold.
    * @param fileSlice File Slice under consideration.
+   * @param instantRange Range of valid instants.
    * @return Boolean value that determines whether log compaction will be 
scheduled or not.
    */
   private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, 
String maxInstantTime,
                                                       Option<InstantRange> 
instantRange) {
-    LOG.info("Checking if fileId " + fileSlice.getFileId() + " and partition "
-        + fileSlice.getPartitionPath() + " eligible for log compaction.");
+    LOG.info("Checking if fileId {} and partition {} eligible for log 
compaction.", fileSlice.getFileId(), fileSlice.getPartitionPath());
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
-    HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-        .withStorage(metaClient.getStorage())
-        .withBasePath(hoodieTable.getMetaClient().getBasePath())
-        .withLogFilePaths(fileSlice.getLogFiles()
-            .sorted(HoodieLogFile.getLogFileComparator())
-            .map(file -> file.getPath().toString())
-            .collect(Collectors.toList()))
-        .withLatestInstantTime(maxInstantTime)
-        .withInstantRange(instantRange)
-        .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
-        .withOptimizedLogBlocksScan(true)
-        .withRecordMerger(writeConfig.getRecordMerger())
-        .withTableMetaClient(metaClient)
-        .build();
-    scanner.scan(true);
+    long numLogFiles = fileSlice.getLogFiles().count();
+    if (numLogFiles >= writeConfig.getLogCompactionBlocksThreshold()) {
+      LOG.info("Total logs files ({}) is greater than log blocks threshold is 
{}", numLogFiles, writeConfig.getLogCompactionBlocksThreshold());
+      return true;
+    }
+    HoodieLogBlockMetadataScanner scanner = new 
HoodieLogBlockMetadataScanner(metaClient, fileSlice.getLogFiles()
+        .sorted(HoodieLogFile.getLogFileComparator())
+        .map(file -> file.getPath().toString())
+        .collect(Collectors.toList()),
+        writeConfig.getMaxDFSStreamBufferSize(),
+        maxInstantTime,
+        instantRange);
     int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
-    LOG.info("Total blocks seen are " + totalBlocks + ", log blocks threshold 
is "
-        + writeConfig.getLogCompactionBlocksThreshold());
+    LOG.info("Total blocks seen are {}, log blocks threshold is {}", 
totalBlocks, writeConfig.getLogCompactionBlocksThreshold());
 
     // If total blocks in the file slice is > blocks threshold value(default 
value is 5).
     // Log compaction can be scheduled.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 35428d7801a..17e9ef21020 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -194,7 +194,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
   @Override
   protected HoodieData<HoodieRecord> 
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-                                                               
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, 
StorageConfiguration<?> storageConf,
+                                                               
HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema 
readerSchema, StorageConfiguration<?> storageConf,
                                                                String 
instantTime) {
     throw new HoodieNotSupportedException("Flink metadata table does not 
support expression index yet.");
   }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
index 64b532707e8..0d3da17bdea 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
@@ -144,7 +144,7 @@ public class JavaHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetada
 
   @Override
   protected HoodieData<HoodieRecord> 
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-                                                               
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, 
StorageConfiguration<?> storageConf,
+                                                               
HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema 
readerSchema, StorageConfiguration<?> storageConf,
                                                                String 
instantTime) {
     throw new HoodieNotSupportedException("Expression index not supported for 
Java metadata table writer yet.");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
index 2ca61d2823c..cac3c251b9d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
@@ -234,7 +234,7 @@ public class HoodieInternalRow extends InternalRow {
     for (int i = 0; i < metaFields.length; i++) {
       copyMetaFields[i] = metaFields[i] != null ? metaFields[i].copy() : null;
     }
-    return new HoodieInternalRow(copyMetaFields, sourceRow.copy(), 
sourceContainsMetaFields);
+    return new HoodieInternalRow(copyMetaFields, sourceRow == null ? null : 
sourceRow.copy(), sourceContainsMetaFields);
   }
 
   private int rebaseOrdinal(int ordinal) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 28ae8622cdf..5a36aacdf56 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -28,25 +28,27 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
-import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
@@ -55,23 +57,22 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.expression.HoodieExpressionIndex;
 import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
 import 
org.apache.hudi.index.expression.HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata;
-import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
-import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.JavaScalaConverters;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.function.FlatMapGroupsFunction;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.HoodieCatalystExpressionUtils;
+import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
@@ -94,11 +95,9 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.Function1;
+import scala.collection.immutable.Seq;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
-import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -137,16 +136,16 @@ public class SparkMetadataWriterUtils {
     };
   }
 
-  public static List<Row> getRowsWithExpressionIndexMetadata(List<Row> 
rowsForFilePath, String partition, String filePath, long fileSize) {
-    return rowsForFilePath.stream().map(row -> {
-      scala.collection.immutable.Seq<Object> indexMetadata = 
JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition, 
filePath, fileSize));
+  public static ClosableIterator<Row> 
getRowsWithExpressionIndexMetadata(ClosableIterator<InternalRow> 
rowsForFilePath, SparkRowSerDe sparkRowSerDe, String partition, String 
filePath, long fileSize) {
+    return new CloseableMappingIterator<>(rowsForFilePath, row -> {
+      Seq<Object> indexMetadata = 
JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition, 
filePath, fileSize));
       Row expressionIndexRow = Row.fromSeq(indexMetadata);
       List<Row> rows = new ArrayList<>(2);
-      rows.add(row);
+      rows.add(sparkRowSerDe.deserializeRow(row));
       rows.add(expressionIndexRow);
-      scala.collection.immutable.Seq<Row> rowSeq = 
JavaScalaConverters.convertJavaListToScalaList(rows);
+      Seq<Row> rowSeq = JavaScalaConverters.convertJavaListToScalaList(rows);
       return Row.merge(rowSeq);
-    }).collect(Collectors.toList());
+    });
   }
 
   @SuppressWarnings("checkstyle:LineLength")
@@ -241,61 +240,6 @@ public class SparkMetadataWriterUtils {
     });
   }
 
-  public static List<Row> readRecordsAsRows(StoragePath[] paths, SQLContext 
sqlContext,
-                                            HoodieTableMetaClient metaClient, 
Schema schema,
-                                            HoodieWriteConfig dataWriteConfig, 
boolean isBaseFile) {
-    List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new 
HoodieBaseFile(paths[0].toString()), metaClient, schema)
-        : 
getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()),
 metaClient, schema);
-    return toRows(records, schema, dataWriteConfig, sqlContext, 
paths[0].toString());
-  }
-
-  private static List<HoodieRecord> getUnmergedLogFileRecords(List<String> 
logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
-    List<HoodieRecord> records = new ArrayList<>();
-    HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-        .withStorage(metaClient.getStorage())
-        .withBasePath(metaClient.getBasePath())
-        .withLogFilePaths(logFilePaths)
-        .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
-        
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
-        .withReaderSchema(readerSchema)
-        .withTableMetaClient(metaClient)
-        .withLogRecordScannerCallback(records::add)
-        .build();
-    scanner.scan(false);
-    return records;
-  }
-
-  private static List<HoodieRecord> getBaseFileRecords(HoodieBaseFile 
baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) {
-    List<HoodieRecord> records = new ArrayList<>();
-    HoodieRecordMerger recordMerger =
-        
HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), 
EngineType.SPARK, Collections.emptyList(),
-            metaClient.getTableConfig().getRecordMergeStrategyId());
-    try (HoodieFileReader baseFileReader = 
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType())
-        .getFileReader(getReaderConfigs(metaClient.getStorageConf()), 
baseFile.getStoragePath())) {
-      baseFileReader.getRecordIterator(readerSchema).forEachRemaining((record) 
-> records.add((HoodieRecord) record));
-      return records;
-    } catch (IOException e) {
-      throw new HoodieIOException("Error reading base file " + 
baseFile.getFileName(), e);
-    }
-  }
-
-  private static List<Row> toRows(List<HoodieRecord> records, Schema schema, 
HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) {
-    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema);
-    Function1<GenericRecord, Row> converterToRow = 
AvroConversionUtils.createConverterToRow(schema, structType);
-    List<Row> avroRecords = records.stream()
-        .map(r -> {
-          try {
-            return (GenericRecord) (r.getData() instanceof GenericRecord ? 
r.getData()
-                : ((HoodieRecordPayload) r.getData()).getInsertValue(schema, 
dataWriteConfig.getProps()).get());
-          } catch (IOException e) {
-            throw new HoodieIOException("Could not fetch record payload");
-          }
-        })
-        .map(converterToRow::apply)
-        .collect(Collectors.toList());
-    return avroRecords;
-  }
-
   /**
    * Generates expression index records
    *
@@ -313,7 +257,7 @@ public class SparkMetadataWriterUtils {
    */
   public static ExpressionIndexComputationMetadata getExprIndexRecords(
       List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, 
HoodieIndexDefinition indexDefinition,
-      HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, 
String instantTime,
+      HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, 
Schema readerSchema, String instantTime,
       HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
       Option<Function<HoodiePairData<String, 
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> 
partitionRecordsFunctionOpt) {
     HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
@@ -326,12 +270,12 @@ public class SparkMetadataWriterUtils {
     //       HUDI-6994 will address this.
     ValidationUtils.checkArgument(indexDefinition.getSourceFields().size() == 
1, "Only one source field is supported for expression index");
     String columnToIndex = indexDefinition.getSourceFields().get(0);
-    SQLContext sqlContext = sparkEngineContext.getSqlContext();
 
+    ReaderContextFactory<InternalRow> readerContextFactory = 
engineContext.getReaderContextFactory(metaClient);
     // Read records and append expression index metadata to every row
     HoodieData<Row> rowData = 
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
         .flatMap((SerializableFunction<Pair<String, Pair<String, Long>>, 
Iterator<Row>>) entry ->
-            getExpressionIndexRecordsIterator(metaClient, readerSchema, 
dataWriteConfig, entry, sqlContext));
+            
getExpressionIndexRecordsIterator(readerContextFactory.getContext(), 
metaClient, tableSchema, readerSchema, dataWriteConfig, entry));
 
     // Generate dataset with expression index metadata
     StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
@@ -356,17 +300,41 @@ public class SparkMetadataWriterUtils {
     }
   }
 
-  private static Iterator<Row> 
getExpressionIndexRecordsIterator(HoodieTableMetaClient metaClient, Schema 
readerSchema, HoodieWriteConfig dataWriteConfig,
-                                                                 Pair<String, 
Pair<String, Long>> entry, SQLContext sqlContext) {
+  private static Iterator<Row> 
getExpressionIndexRecordsIterator(HoodieReaderContext<InternalRow> 
readerContext, HoodieTableMetaClient metaClient,
+                                                                 Schema 
tableSchema, Schema readerSchema, HoodieWriteConfig dataWriteConfig, 
Pair<String, Pair<String, Long>> entry) {
     String partition = entry.getKey();
     Pair<String, Long> filePathSizePair = entry.getValue();
     String filePath = filePathSizePair.getKey();
     String relativeFilePath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(filePath));
     long fileSize = filePathSizePair.getValue();
-    List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new 
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
-        FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
-    List<Row> rowsWithIndexMetadata = 
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition, 
relativeFilePath, fileSize);
-    return rowsWithIndexMetadata.iterator();
+    boolean isBaseFile = FSUtils.isBaseFile(new 
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1)));
+    FileSlice fileSlice;
+    if (isBaseFile) {
+      HoodieBaseFile baseFile = new HoodieBaseFile(filePath);
+      fileSlice = new FileSlice(partition, baseFile.getCommitTime(), 
baseFile.getFileId());
+      fileSlice.setBaseFile(baseFile);
+    } else {
+      HoodieLogFile logFile = new HoodieLogFile(filePath);
+      fileSlice = new FileSlice(partition, logFile.getDeltaCommitTime(), 
logFile.getFileId());
+      fileSlice.addLogFile(logFile);
+    }
+    HoodieFileGroupReader<InternalRow> fileGroupReader = 
HoodieFileGroupReader.<InternalRow>newBuilder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(metaClient)
+        .withDataSchema(tableSchema)
+        .withRequestedSchema(readerSchema)
+        .withProps(dataWriteConfig.getProps())
+        
.withLatestCommitTime(metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(""))
+        .withAllowInflightInstants(true)
+        .withFileSlice(fileSlice)
+        .build();
+    try {
+      ClosableIterator<InternalRow> rowsForFilePath = 
fileGroupReader.getClosableIterator();
+      SparkRowSerDe sparkRowSerDe = 
HoodieCatalystExpressionUtils.sparkAdapter().createSparkRowSerDe(HoodieInternalRowUtils.getCachedSchema(readerSchema));
+      return getRowsWithExpressionIndexMetadata(rowsForFilePath, 
sparkRowSerDe, partition, relativeFilePath, fileSize);
+    } catch (IOException ex) {
+      throw new HoodieIOException("Error reading file slice " + fileSlice, ex);
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 7c555c958c6..f3610e2c746 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -210,14 +211,15 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> writeStats.forEach(writeStat -> partitionFilePathPairs.add(
         Pair.of(writeStat.getPartitionPath(), Pair.of(new 
StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(), 
writeStat.getFileSizeInBytes())))));
     int parallelism = Math.min(partitionFilePathPairs.size(), 
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
-    Schema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
+    Schema tableSchema = new 
TableSchemaResolver(dataMetaClient).getTableAvroSchema();
+    Schema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient, 
tableSchema);
     // Step 2: Compute the expression index column stat and partition stat 
records for these newly created files
     // partitionRecordsFunctionOpt - Function used to generate partition 
stats. These stats are generated only for expression index created using column 
stats
     //
     // In the partitionRecordsFunctionOpt function we merge the expression 
index records from the new files created in the commit metadata
     // with the expression index records from the unmodified files to get the 
new partition stat records
     HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata 
expressionIndexComputationMetadata =
-        SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs, 
indexDefinition, dataMetaClient, parallelism, readerSchema, instantTime, 
engineContext, dataWriteConfig,
+        SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs, 
indexDefinition, dataMetaClient, parallelism, tableSchema, readerSchema, 
instantTime, engineContext, dataWriteConfig,
             partitionRecordsFunctionOpt);
     return 
expressionIndexComputationMetadata.getPartitionStatRecordsOption().isPresent()
         ? 
expressionIndexComputationMetadata.getExpressionIndexRecords().union(expressionIndexComputationMetadata.getPartitionStatRecordsOption().get())
@@ -228,10 +230,10 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   protected HoodieData<HoodieRecord> 
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet,
                                                                
HoodieIndexDefinition indexDefinition,
                                                                
HoodieTableMetaClient metaClient, int parallelism,
-                                                               Schema 
readerSchema, StorageConfiguration<?> storageConf,
+                                                               Schema 
tableSchema, Schema readerSchema, StorageConfiguration<?> storageConf,
                                                                String 
instantTime) {
     ExpressionIndexComputationMetadata expressionIndexComputationMetadata = 
SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathAndSizeTriplet, 
indexDefinition,
-        metaClient, parallelism, readerSchema, instantTime, engineContext, 
dataWriteConfig,
+        metaClient, parallelism, tableSchema, readerSchema, instantTime, 
engineContext, dataWriteConfig,
         Option.of(rangeMetadata ->
             
HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(rangeMetadata,
 true, Option.of(indexDefinition.getIndexName()))));
     HoodieData<HoodieRecord> exprIndexRecords = 
expressionIndexComputationMetadata.getExpressionIndexRecords();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 7eb9de51441..e49ea638269 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -174,7 +174,8 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
 
   @Override
   protected HoodieData<HoodieRecord> 
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-                                                               
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, 
StorageConfiguration<?> storageConf, String instantTime) {
+                                                               
HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema 
readerSchema, StorageConfiguration<?> storageConf,
+                                                               String 
instantTime) {
     throw new HoodieNotSupportedException("Expression index not supported for 
Java metadata table writer yet.");
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 002f374abe7..adae4c43388 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi;
 
+import org.apache.hudi.client.model.HoodieInternalRow;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -170,6 +171,9 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
 
   @Override
   public InternalRow getDeleteRow(InternalRow record, String recordKey) {
-    throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
+    if (record != null) {
+      return record;
+    }
+    return new HoodieInternalRow(null, null, UTF8String.fromString(recordKey), 
UTF8String.fromString(partitionPath), null, null, false);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index c7117b891fa..a1d750e6823 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -91,7 +91,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
   private final String payloadClassFQN;
   // Record's key/partition-path fields
   private final String recordKeyField;
-  private final Option<String> partitionPathFieldOpt;
   // Partition name override
   private final Option<String> partitionNameOverrideOpt;
   // Pre-combining field
@@ -128,8 +127,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
   protected final boolean forceFullScan;
   // Progress
   private float progress = 0.0f;
-  // Populate meta fields for the records
-  private final boolean populateMetaFields;
   // Record type read from log block
   // Collect all the block instants after scanning all the log files.
   private final List<String> validBlockInstants = new ArrayList<>();
@@ -139,17 +136,15 @@ public abstract class BaseHoodieLogRecordReader<T> {
   // Allows to consider inflight instants while merging log records
   protected boolean allowInflightInstants;
 
-  protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext, 
HoodieStorage storage, List<String> logFilePaths,
+  protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage, 
List<String> logFilePaths,
                                       boolean reverseReader, int bufferSize, 
Option<InstantRange> instantRange,
                                       boolean withOperationField, boolean 
forceFullScan, Option<String> partitionNameOverride,
                                       Option<String> keyFieldOverride, boolean 
enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
                                       boolean allowInflightInstants) {
     this.readerContext = readerContext;
-    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.readerSchema = readerContext.getSchemaHandler() != null ? 
readerContext.getSchemaHandler().getRequiredSchema() : null;
     this.latestInstantTime = readerContext.getLatestCommitTime();
-    this.hoodieTableMetaClient = HoodieTableMetaClient.builder()
-        .setStorage(storage)
-        .setBasePath(readerContext.getTablePath()).build();
+    this.hoodieTableMetaClient = hoodieTableMetaClient;
     // load class from the payload fully qualified class name
     HoodieTableConfig tableConfig = 
this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
@@ -168,7 +163,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
     this.instantRange = instantRange;
     this.withOperationField = withOperationField;
     this.forceFullScan = forceFullScan;
-    this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
+    this.internalSchema = readerContext.getSchemaHandler() != null ? 
readerContext.getSchemaHandler().getInternalSchema() : null;
     this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
 
     if (keyFieldOverride.isPresent()) {
@@ -179,17 +174,11 @@ public abstract class BaseHoodieLogRecordReader<T> {
       //         are static, like "files", "col_stats", etc)
       checkState(partitionNameOverride.isPresent());
 
-      this.populateMetaFields = false;
       this.recordKeyField = keyFieldOverride.get();
-      this.partitionPathFieldOpt = Option.empty();
     } else if (tableConfig.populateMetaFields()) {
-      this.populateMetaFields = true;
       this.recordKeyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
-      this.partitionPathFieldOpt = 
Option.of(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
     } else {
-      this.populateMetaFields = false;
       this.recordKeyField = tableConfig.getRecordKeyFieldProp();
-      this.partitionPathFieldOpt = 
Option.of(tableConfig.getPartitionFieldProp());
     }
 
     this.partitionNameOverrideOpt = partitionNameOverride;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
new file mode 100644
index 00000000000..14d65d1aa6a
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.List;
+
+/**
+ * Scans a set of log files to extract metadata about the log blocks. It does 
not read the actual records.
+ */
+public class HoodieLogBlockMetadataScanner extends 
BaseHoodieLogRecordReader<IndexedRecord> {
+
+  public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient, 
List<String> logFilePaths, int bufferSize, String maxInstantTime, 
Option<InstantRange> instantRange) {
+    super(getReaderContext(metaClient, maxInstantTime), metaClient, 
metaClient.getStorage(), logFilePaths, false, bufferSize, instantRange, false, 
false, Option.empty(), Option.empty(), true,
+        null, false);
+    scanInternal(Option.empty(), true);
+  }
+
+  private static HoodieReaderContext<IndexedRecord> 
getReaderContext(HoodieTableMetaClient metaClient, String maxInstantTime) {
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(metaClient.getStorage().getConf(), 
metaClient.getTableConfig(), Option.empty(), Option.empty());
+    readerContext.setHasLogFiles(true);
+    readerContext.setHasBootstrapBaseFile(false);
+    readerContext.setLatestCommitTime(maxInstantTime);
+    return readerContext;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 513c2cce497..f6efc50426d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -62,11 +63,11 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   private long totalTimeTakenToReadAndMergeBlocks;
 
   @SuppressWarnings("unchecked")
-  private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieStorage storage, List<String> logFilePaths, boolean reverseReader,
+  private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient metaClient, HoodieStorage storage, List<String> 
logFilePaths, boolean reverseReader,
                                       int bufferSize, Option<InstantRange> 
instantRange, boolean withOperationField, boolean forceFullScan,
                                       Option<String> partitionName, 
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
                                       FileGroupRecordBuffer<T> recordBuffer, 
boolean allowInflightInstants) {
-    super(readerContext, storage, logFilePaths, reverseReader, bufferSize, 
instantRange, withOperationField,
+    super(readerContext, metaClient, storage, logFilePaths, reverseReader, 
bufferSize, instantRange, withOperationField,
         forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
 
     if (forceFullScan) {
@@ -175,6 +176,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
 
     private FileGroupRecordBuffer<T> recordBuffer;
     private boolean allowInflightInstants = false;
+    private HoodieTableMetaClient metaClient;
 
     @Override
     public Builder<T> withHoodieReaderContext(HoodieReaderContext<T> 
readerContext) {
@@ -252,6 +254,11 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       return this;
     }
 
+    public Builder<T> withMetaClient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
     @Override
     public HoodieMergedLogRecordReader<T> build() {
       ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is 
null in Merged Log Record Reader");
@@ -262,7 +269,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       }
 
       return new HoodieMergedLogRecordReader<>(
-          readerContext, storage, logFilePaths,
+          readerContext, metaClient, storage, logFilePaths,
           reverseReader, bufferSize, instantRange,
           withOperationField, forceFullScan,
           Option.ofNullable(partitionName),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index f751f33b100..4c712cf73f1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -77,6 +77,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
  */
 public final class HoodieFileGroupReader<T> implements Closeable {
   private final HoodieReaderContext<T> readerContext;
+  private final HoodieTableMetaClient metaClient;
   private final Option<HoodieBaseFile> hoodieBaseFileOption;
   private final List<HoodieLogFile> logFiles;
   private final String partitionPath;
@@ -121,6 +122,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                 Option<InternalSchema> internalSchemaOpt, 
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
                                 long start, long length, boolean 
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete) {
     this.readerContext = readerContext;
+    this.metaClient = hoodieTableMetaClient;
     this.storage = storage;
     this.hoodieBaseFileOption = fileSlice.getBaseFile();
     this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
@@ -352,6 +354,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
             new StoragePath(path), logFiles.get(0).getPath().getParent()))
         .withRecordBuffer(recordBuffer)
         .withAllowInflightInstants(allowInflightInstants)
+        .withMetaClient(metaClient)
         .build()) {
       
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
       
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6f8435f7ad3..82f25c76f85 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.avro.ConvertingGenericData;
+import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.BooleanWrapper;
 import org.apache.hudi.avro.model.DateWrapper;
@@ -39,7 +40,9 @@ import org.apache.hudi.avro.model.TimestampMicrosWrapper;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
@@ -47,6 +50,7 @@ import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
@@ -73,7 +77,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
@@ -132,6 +136,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -158,7 +163,9 @@ import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMOR
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.REALTIME_SKIP_MERGE;
 import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
 import static 
org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
@@ -243,7 +250,7 @@ public class HoodieTableMetadataUtil {
    *         the collection of provided records
    */
   public static Map<String, HoodieColumnRangeMetadata<Comparable>> 
collectColumnRangeMetadata(
-      List<HoodieRecord> records,
+      Iterator<HoodieRecord> records,
       List<Pair<String, Schema.Field>> targetFields,
       String filePath,
       Schema recordSchema,
@@ -263,7 +270,7 @@ public class HoodieTableMetadataUtil {
         storageConfig.getString(HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), 
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
     // Collect stats for all columns by iterating through records while 
accounting
     // corresponding stats
-    records.forEach((record) -> {
+    records.forEachRemaining((record) -> {
       // For each column (field) we have to index update corresponding column 
stats
       // with the values from this record
       targetFields.forEach(fieldNameFieldPair -> {
@@ -1082,35 +1089,6 @@ public class HoodieTableMetadataUtil {
         }, parallelism).values();
   }
 
-  @VisibleForTesting
-  public static Set<String> getRecordKeys(List<String> logFilePaths, 
HoodieTableMetaClient datasetMetaClient,
-                                          Option<Schema> writerSchemaOpt, int 
maxBufferSize,
-                                          String latestCommitTimestamp, 
boolean includeValidKeys,
-                                          boolean includeDeletedKeys) throws 
IOException {
-    if (writerSchemaOpt.isPresent()) {
-      // read log file records without merging
-      Set<String> allRecordKeys = new HashSet<>();
-      HoodieUnMergedLogRecordScanner.Builder builder = 
HoodieUnMergedLogRecordScanner.newBuilder()
-          .withStorage(datasetMetaClient.getStorage())
-          .withBasePath(datasetMetaClient.getBasePath())
-          .withLogFilePaths(logFilePaths)
-          .withBufferSize(maxBufferSize)
-          .withLatestInstantTime(latestCommitTimestamp)
-          .withReaderSchema(writerSchemaOpt.get())
-          .withTableMetaClient(datasetMetaClient);
-      if (includeValidKeys) {
-        builder.withLogRecordScannerCallback(record -> 
allRecordKeys.add(record.getRecordKey()));
-      }
-      if (includeDeletedKeys) {
-        builder.withRecordDeletionCallback(deletedKey -> 
allRecordKeys.add(deletedKey.getRecordKey()));
-      }
-      HoodieUnMergedLogRecordScanner scanner = builder.build();
-      scanner.scan();
-      return allRecordKeys;
-    }
-    return Collections.emptySet();
-  }
-
   /**
    * Convert rollback action metadata to metadata table records.
    * <p>
@@ -1685,7 +1663,7 @@ public class HoodieTableMetadataUtil {
       } else if (FSUtils.isLogFile(fileName)) {
         Option<Schema> writerSchemaOpt = 
tryResolveSchemaForTable(datasetMetaClient);
         LOG.warn("Reading log file: {}, to build column range metadata.", 
partitionPathFileName);
-        return getLogFileColumnRangeMetadata(fullFilePath.toString(), 
datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize);
+        return getLogFileColumnRangeMetadata(fullFilePath.toString(), 
partitionPath, datasetMetaClient, columnsToIndex, writerSchemaOpt, 
maxBufferSize);
       }
       LOG.warn("Column range index not supported for: {}", 
partitionPathFileName);
       return Collections.emptyList();
@@ -1701,31 +1679,39 @@ public class HoodieTableMetadataUtil {
    * Read column range metadata from log file.
    */
   @VisibleForTesting
-  public static List<HoodieColumnRangeMetadata<Comparable>> 
getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient 
datasetMetaClient,
+  public static List<HoodieColumnRangeMetadata<Comparable>> 
getLogFileColumnRangeMetadata(String filePath, String partitionPath,
+                                                                               
           HoodieTableMetaClient datasetMetaClient,
                                                                                
           List<String> columnsToIndex, Option<Schema> writerSchemaOpt,
                                                                                
           int maxBufferSize) throws IOException {
     if (writerSchemaOpt.isPresent()) {
       List<Pair<String, Schema.Field>> fieldsToIndex = 
columnsToIndex.stream().map(fieldName -> 
HoodieAvroUtils.getSchemaForField(writerSchemaOpt.get(), fieldName))
           .collect(Collectors.toList());
-      // read log file records without merging
-      List<HoodieRecord> records = new ArrayList<>();
-      HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-          .withStorage(datasetMetaClient.getStorage())
-          .withBasePath(datasetMetaClient.getBasePath())
-          .withLogFilePaths(Collections.singletonList(filePath))
-          .withBufferSize(maxBufferSize)
-          
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
-          .withReaderSchema(writerSchemaOpt.get())
-          .withTableMetaClient(datasetMetaClient)
-          .withLogRecordScannerCallback(records::add)
+      // read log files without merging for lower overhead, log files may 
contain multiple records for the same key resulting in a wider range of values 
than the merged result
+      HoodieLogFile logFile = new HoodieLogFile(filePath);
+      FileSlice fileSlice = new FileSlice(partitionPath, 
logFile.getDeltaCommitTime(), logFile.getFileId());
+      fileSlice.addLogFile(logFile);
+      TypedProperties properties = new TypedProperties();
+      properties.setProperty(MAX_MEMORY_FOR_MERGE.key(), 
Long.toString(maxBufferSize));
+      properties.setProperty(HoodieReaderConfig.MERGE_TYPE.key(), 
REALTIME_SKIP_MERGE);
+      // Currently only avro is fully supported for extracting column ranges 
(see HUDI-8585)
+      HoodieReaderContext readerContext = new 
HoodieAvroReaderContext(datasetMetaClient.getStorageConf(), 
datasetMetaClient.getTableConfig(), Option.empty(), Option.empty());
+      HoodieFileGroupReader fileGroupReader = 
HoodieFileGroupReader.newBuilder()
+          .withReaderContext(readerContext)
+          .withHoodieTableMetaClient(datasetMetaClient)
+          .withFileSlice(fileSlice)
+          .withDataSchema(writerSchemaOpt.get())
+          .withRequestedSchema(writerSchemaOpt.get())
+          
.withLatestCommitTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
+          .withProps(properties)
           .build();
-      scanner.scan();
-      if (records.isEmpty()) {
-        return Collections.emptyList();
+      try (ClosableIterator<HoodieRecord> recordIterator = 
(ClosableIterator<HoodieRecord>) 
fileGroupReader.getClosableHoodieRecordIterator()) {
+        if (!recordIterator.hasNext()) {
+          return Collections.emptyList();
+        }
+        Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
+            collectColumnRangeMetadata(recordIterator, fieldsToIndex, 
getFileNameFromPath(filePath), writerSchemaOpt.get(), 
datasetMetaClient.getStorage().getConf());
+        return new ArrayList<>(columnRangeMetadataMap.values());
       }
-      Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
-          collectColumnRangeMetadata(records, fieldsToIndex, 
getFileNameFromPath(filePath), writerSchemaOpt.get(), 
datasetMetaClient.getStorage().getConf());
-      return new ArrayList<>(columnRangeMetadataMap.values());
     }
     return Collections.emptyList();
   }
@@ -2435,8 +2421,7 @@ public class HoodieTableMetadataUtil {
     });
   }
 
-  public static Schema 
getProjectedSchemaForExpressionIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
-    Schema tableSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+  public static Schema 
getProjectedSchemaForExpressionIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient, Schema tableSchema) {
     List<String> partitionFields = 
metaClient.getTableConfig().getPartitionFields()
         .map(Arrays::asList)
         .orElse(Collections.emptyList());
@@ -2554,12 +2539,11 @@ public class HoodieTableMetadataUtil {
                                                                              
List<Pair<String, FileSlice>> partitionInfoList,
                                                                              
HoodieMetadataConfig metadataConfig,
                                                                              
HoodieTableMetaClient dataTableMetaClient,
-                                                                             
Option<Schema> writerSchemaOpt,
+                                                                             
Lazy<Option<Schema>> lazyWriterSchemaOpt,
                                                                              
Option<HoodieRecordType> recordTypeOpt) {
     if (partitionInfoList.isEmpty()) {
       return engineContext.emptyHoodieData();
     }
-    Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ? 
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() -> 
tryResolveSchemaForTable(dataTableMetaClient));
     final Map<String, Schema> columnsToIndexSchemaMap = 
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig, 
lazyWriterSchemaOpt,
         
dataTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().empty(),
 recordTypeOpt);
     if (columnsToIndexSchemaMap.isEmpty()) {
@@ -2766,7 +2750,7 @@ public class HoodieTableMetadataUtil {
                                                                                
            Option<Schema> writerSchemaOpt) {
     if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) 
writeStat).getColumnStats().isPresent()) {
       Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
-      return columnRangeMap.values().stream().collect(Collectors.toList());
+      return new ArrayList<>(columnRangeMap.values());
     }
 
     String filePath = writeStat.getPath();
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index bcf67bfb9e3..3e02dae081f 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -168,7 +168,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
             .withPartitionStatsIndexParallelism(1)
             .build(),
         metaClient,
-        Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+        
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)),
         Option.empty());
     // Validate the result.
     validatePartitionStats(result, instant1, instant2);
@@ -262,6 +262,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         
metaClient.getTableConfig().setValue(HoodieTableConfig.PARTITION_FIELDS.key(), 
"partition_path");
         List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataLogFile 
= HoodieTableMetadataUtil.getLogFileColumnRangeMetadata(
             storagePath2.toString(),
+            p,
             metaClient,
             columnsToIndex,
             
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
@@ -283,7 +284,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
             .withPartitionStatsIndexParallelism(1)
             .build(),
         metaClient,
-        Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+        
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)),
         Option.empty());
     // Validate the result.
     validatePartitionStats(result, instant1, instant2, 6);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
deleted file mode 100644
index e38988ff475..00000000000
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.hadoop.realtime;
-
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.serialization.DefaultSerializer;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
-import org.apache.hudi.common.util.HoodieTimer;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.storage.HoodieStorageUtils;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static 
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE;
-import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE;
-import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
-import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
-import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getBaseFileReader;
-import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;
-import static 
org.apache.hudi.internal.schema.InternalSchema.getEmptyInternalSchema;
-
-/**
- * An implementation of {@link AbstractRealtimeRecordReader} that reads from 
base parquet files and log files,
- * and merges the records on the fly. It differs from {@link 
HoodieRealtimeRecordReader} in that it does not
- * implement Hadoop's RecordReader interface, and instead implements Iterator 
interface that returns an iterator
- * of {@link HoodieRecord}s which are {@link HoodieAvroIndexedRecord}s. This 
can be used by query engines like
- * Trino that do not use Hadoop's RecordReader interface. However, the engine 
must support reading from iterators
- * and also support Avro (de)serialization.
- */
-public class HoodieMergeOnReadSnapshotReader extends 
AbstractRealtimeRecordReader implements Iterator<HoodieRecord>, AutoCloseable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeOnReadSnapshotReader.class);
-
-  private final String tableBasePath;
-  private final List<HoodieLogFile> logFilePaths;
-  private final String latestInstantTime;
-  private final Schema readerSchema;
-  private final JobConf jobConf;
-  private final HoodieMergedLogRecordScanner logRecordScanner;
-  private final HoodieFileReader baseFileReader;
-  private final Map<String, HoodieRecord> logRecordsByKey;
-  private final Iterator<HoodieRecord> recordsIterator;
-  private final ExternalSpillableMap<String, HoodieRecord> mergedRecordsByKey;
-
-  /**
-   * In order to instantiate this record reader, one needs to provide 
following parameters.
-   * An example usage is demonstrated in TestHoodieMergeOnReadSnapshotReader.
-   *
-   * @param tableBasePath     Base path of the Hudi table
-   * @param baseFilePath      Path of the base file as of the latest instant 
time for the split being processed
-   * @param logFilePaths      Paths of the log files as of the latest file 
slices pertaining to file group id of the base file
-   * @param latestInstantTime Latest instant time
-   * @param readerSchema      Schema of the reader
-   * @param jobConf           Any job configuration
-   * @param start             Start offset
-   * @param length            Length of the split
-   */
-  public HoodieMergeOnReadSnapshotReader(String tableBasePath,
-                                         String baseFilePath,
-                                         List<HoodieLogFile> logFilePaths,
-                                         String latestInstantTime,
-                                         Schema readerSchema,
-                                         JobConf jobConf,
-                                         long start,
-                                         long length) throws IOException {
-    super(getRealtimeSplit(tableBasePath, baseFilePath, logFilePaths, 
latestInstantTime, start, length, new String[0]), jobConf);
-    this.tableBasePath = tableBasePath;
-    this.logFilePaths = logFilePaths;
-    this.latestInstantTime = latestInstantTime;
-    this.readerSchema = readerSchema;
-    this.jobConf = jobConf;
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    this.logRecordScanner = getMergedLogRecordScanner();
-    LOG.debug("Time taken to scan log records: {}", timer.endTimer());
-    this.baseFileReader = getBaseFileReader(new Path(baseFilePath), jobConf);
-    this.logRecordsByKey = logRecordScanner.getRecords();
-    Set<String> logRecordKeys = new HashSet<>(this.logRecordsByKey.keySet());
-    this.mergedRecordsByKey = new ExternalSpillableMap<>(
-        getMaxCompactionMemoryInBytes(jobConf),
-        jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
-            FileIOUtils.getDefaultSpillableMapBasePath()),
-        new DefaultSizeEstimator(),
-        new HoodieRecordSizeEstimator(readerSchema),
-        jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()),
-        new DefaultSerializer<>(),
-        jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), 
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()),
-        getClass().getSimpleName());
-    try (ClosableIterator<String> baseFileIterator = 
baseFileReader.getRecordKeyIterator()) {
-      timer.startTimer();
-      while (baseFileIterator.hasNext()) {
-        String key = baseFileIterator.next();
-        if (logRecordKeys.contains(key)) {
-          logRecordKeys.remove(key);
-          Option<HoodieAvroIndexedRecord> mergedRecord = 
buildGenericRecordWithCustomPayload(logRecordsByKey.get(key));
-          if (mergedRecord.isPresent()) {
-            HoodieRecord hoodieRecord = mergedRecord.get().copy();
-            mergedRecordsByKey.put(key, hoodieRecord);
-          }
-        }
-      }
-    }
-    LOG.debug("Time taken to merge base file and log file records: {}", 
timer.endTimer());
-    this.recordsIterator = mergedRecordsByKey.values().iterator();
-  }
-
-  @Override
-  public boolean hasNext() {
-    return recordsIterator.hasNext();
-  }
-
-  @Override
-  public HoodieRecord next() {
-    return recordsIterator.next();
-  }
-
-  public Map<String, HoodieRecord> getRecordsByKey() {
-    return mergedRecordsByKey;
-  }
-
-  public Iterator<HoodieRecord> getRecordsIterator() {
-    return recordsIterator;
-  }
-
-  public Map<String, HoodieRecord> getLogRecordsByKey() {
-    return logRecordsByKey;
-  }
-
-  private static HoodieRealtimeFileSplit getRealtimeSplit(String 
tableBasePath, String baseFilePath,
-                                                          List<HoodieLogFile> 
logFilePaths,
-                                                          String 
latestInstantTime,
-                                                          long start, long 
length, String[] hosts) {
-    HoodieRealtimePath realtimePath = new HoodieRealtimePath(
-        new Path(baseFilePath).getParent(),
-        baseFilePath,
-        tableBasePath,
-        logFilePaths,
-        latestInstantTime,
-        false, // TODO: Fix this to support incremental queries
-        Option.empty());
-    return HoodieInputFormatUtils.createRealtimeFileSplit(realtimePath, start, 
length, hosts);
-  }
-
-  private HoodieMergedLogRecordScanner getMergedLogRecordScanner() {
-    return HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(HoodieStorageUtils.getStorage(
-            split.getPath().toString(), HadoopFSUtils.getStorageConf(jobConf)))
-        .withBasePath(tableBasePath)
-        .withLogFilePaths(logFilePaths.stream().map(logFile -> 
logFile.getPath().toString()).collect(Collectors.toList()))
-        .withReaderSchema(readerSchema)
-        .withLatestInstantTime(latestInstantTime)
-        .withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf))
-        .withReverseReader(false)
-        .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE.key(),
-            DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
-        .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
-            FileIOUtils.getDefaultSpillableMapBasePath()))
-        .withDiskMapType(jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
-        
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-        
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
-            
Boolean.parseBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
-        
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(getEmptyInternalSchema()))
-        .build();
-  }
-
-  private Option<HoodieAvroIndexedRecord> 
buildGenericRecordWithCustomPayload(HoodieRecord record) throws IOException {
-    if (usesCustomPayload) {
-      return record.toIndexedRecord(getWriterSchema(), payloadProps);
-    } else {
-      return record.toIndexedRecord(readerSchema, payloadProps);
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (baseFileReader != null) {
-      baseFileReader.close();
-    }
-    if (logRecordScanner != null) {
-      logRecordScanner.close();
-    }
-    if (mergedRecordsByKey != null) {
-      mergedRecordsByKey.close();
-    }
-  }
-}
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
deleted file mode 100644
index 555b7cd2731..00000000000
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.hadoop.realtime;
-
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath;
-import static 
org.apache.hudi.hadoop.testutils.InputFormatTestUtil.writeDataBlockToLogFile;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestHoodieMergeOnReadSnapshotReader {
-
-  private static final int TOTAL_RECORDS = 100;
-  private static final String FILE_ID = "fileid0";
-  private static final String COLUMNS =
-      
"_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,field1,field2,name,favorite_number,favorite_color,favorite_movie";
-  private static final String COLUMN_TYPES = 
"string,string,string,string,string,string,string,string,int,string,string";
-  private JobConf baseJobConf;
-  private HoodieStorage storage;
-  private Configuration hadoopConf;
-
-  @TempDir
-  public java.nio.file.Path basePath;
-
-  @BeforeEach
-  public void setUp() {
-    hadoopConf = HoodieTestUtils.getDefaultStorageConf().unwrap();
-    hadoopConf.set("fs.defaultFS", "file:///");
-    hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
-    baseJobConf = new JobConf(hadoopConf);
-    baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
-    baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
-    baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
-    baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");
-    storage = new HoodieHadoopStorage(HadoopFSUtils.getFs(new 
StoragePath(basePath.toUri()), baseJobConf));
-  }
-
-  @AfterEach
-  public void tearDown() throws Exception {
-    if (storage != null) {
-      storage.deleteDirectory(new StoragePath(basePath.toUri()));
-      storage.close();
-    }
-  }
-
-  @Test
-  public void testSnapshotReader() throws Exception {
-    testReaderInternal(false, 
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
-  }
-
-  @Test
-  public void testSnapshotReaderPartitioned() throws Exception {
-    testReaderInternal(true, 
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
-  }
-
-  private void testReaderInternal(boolean partitioned, 
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
-    // initial commit
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
-    HoodieTestUtils.init(HadoopFSUtils.getStorageConf(hadoopConf), 
basePath.toString(), HoodieTableType.MERGE_ON_READ);
-    String baseInstant = "100";
-    File partitionDir = partitioned ? 
InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, TOTAL_RECORDS, 
baseInstant,
-        HoodieTableType.MERGE_ON_READ)
-        : InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, 
schema, 1, TOTAL_RECORDS, baseInstant,
-        HoodieTableType.MERGE_ON_READ);
-
-    HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), 
Option.empty(), WriteOperationType.UPSERT,
-        schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
-    FileCreateUtilsLegacy.createDeltaCommit(COMMIT_METADATA_SER_DE, 
basePath.toString(), baseInstant, commitMetadata);
-    // Add the paths
-    FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
-
-    List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
-    logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
-    logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
-    String baseFilePath = partitionDir + "/" + FILE_ID + "_1-0-1_" + 
baseInstant + ".parquet";
-    String partitionPath = partitioned ? getRelativePartitionPath(new 
Path(basePath.toString()), new Path(partitionDir.getAbsolutePath())) : 
"default";
-    FileSlice fileSlice = new FileSlice(
-        new HoodieFileGroupId(partitionPath, FILE_ID),
-        baseInstant,
-        new HoodieBaseFile(storage.getPathInfo(new StoragePath(baseFilePath))),
-        new ArrayList<>());
-    logVersionsWithAction.forEach(logVersionWithAction -> {
-      try {
-        // update files or generate new log file
-        int logVersion = logVersionWithAction.getRight();
-        String action = logVersionWithAction.getKey();
-        int baseInstantTs = Integer.parseInt(baseInstant);
-        String instantTime = String.valueOf(baseInstantTs + logVersion);
-        String latestInstant =
-            action.equals(HoodieTimeline.ROLLBACK_ACTION) ? 
String.valueOf(baseInstantTs + logVersion - 2)
-                : instantTime;
-
-        HoodieLogFormat.Writer writer = writeDataBlockToLogFile(
-            partitionDir,
-            storage,
-            schema,
-            FILE_ID,
-            baseInstant,
-            latestInstant,
-            120,
-            0,
-            logVersion,
-            logBlockType);
-        long size = writer.getCurrentSize();
-        writer.close();
-        assertTrue(size > 0, "block - size should be > 0");
-        FileCreateUtilsLegacy.createDeltaCommit(COMMIT_METADATA_SER_DE, 
basePath.toString(), instantTime, commitMetadata);
-        fileSlice.addLogFile(writer.getLogFile());
-
-        HoodieMergeOnReadSnapshotReader snapshotReader = new 
HoodieMergeOnReadSnapshotReader(
-            basePath.toString(),
-            fileSlice.getBaseFile().get().getPath(),
-            fileSlice.getLogFiles().collect(Collectors.toList()),
-            latestInstant,
-            schema,
-            baseJobConf,
-            0,
-            size);
-        Map<String, HoodieRecord> records = snapshotReader.getRecordsByKey();
-        assertEquals(TOTAL_RECORDS, records.size());
-        snapshotReader.close();
-      } catch (Exception ioe) {
-        throw new HoodieException(ioe.getMessage(), ioe);
-      }
-    });
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java
deleted file mode 100644
index 2b13c068112..00000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi;
-
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.testutils.HoodieClientTestBase;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
-import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-class TestDataSourceUtils extends HoodieClientTestBase {
-
-  @Test
-  void testDeduplicationAgainstRecordsAlreadyInTable() {
-    HoodieWriteConfig config = getConfig();
-    config.getProps().setProperty("path", config.getBasePath());
-    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
-      String newCommitTime = writeClient.startCommit();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
-      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 2);
-      List<WriteStatus> statusList = writeClient.bulkInsert(recordsRDD, 
newCommitTime).collect();
-      writeClient.commit(newCommitTime, jsc.parallelize(statusList), 
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
-      assertNoWriteErrors(statusList);
-
-      Map<String, String> parameters = 
config.getProps().entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().toString(), entry -> entry.getValue().toString()));
-      List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, 
10);
-      List<HoodieRecord> inputRecords = Stream.concat(records.subList(0, 
10).stream(), newRecords.stream()).collect(Collectors.toList());
-      List<HoodieRecord> output = DataSourceUtils.resolveDuplicates(jsc, 
jsc.parallelize(inputRecords, 1), parameters, false).collect();
-      Set<String> expectedRecordKeys = 
newRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
-      assertEquals(expectedRecordKeys, 
output.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet()));
-    }
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
index 519a8811eb5..0ed3d0ed20e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
@@ -18,28 +18,22 @@
 
 package org.apache.hudi.testutils;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
 
 /**
  * Util methods used in tests to fetch col stats records for a log file.
@@ -47,29 +41,12 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRang
 public class LogFileColStatsTestUtil {
 
   public static Option<Row> getLogFileColumnRangeMetadata(String filePath, 
HoodieTableMetaClient datasetMetaClient, String latestCommitTime,
-                                                  List<String> columnsToIndex, 
Option<Schema> writerSchemaOpt,
-                                                  int maxBufferSize) throws 
IOException {
+                                                          List<String> 
columnsToIndex, Option<Schema> writerSchemaOpt,
+                                                          int maxBufferSize) 
throws IOException {
     if (writerSchemaOpt.isPresent()) {
-      List<Pair<String, Schema.Field>> fieldsToIndex = columnsToIndex.stream()
-          .map(fieldName -> 
HoodieAvroUtils.getSchemaForField(writerSchemaOpt.get(), fieldName, ""))
-          .collect(Collectors.toList());
-      List<HoodieRecord> records = new ArrayList<>();
-      HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-          .withStorage(datasetMetaClient.getStorage())
-          .withBasePath(datasetMetaClient.getBasePath())
-          .withLogFilePaths(Collections.singletonList(filePath))
-          .withBufferSize(maxBufferSize)
-          .withLatestInstantTime(latestCommitTime)
-          .withReaderSchema(writerSchemaOpt.get())
-          .withLogRecordScannerCallback(records::add)
-          .build();
-      scanner.scan();
-      if (records.isEmpty()) {
-        return Option.empty();
-      }
-      Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
-          collectColumnRangeMetadata(records, fieldsToIndex, filePath, 
writerSchemaOpt.get(), datasetMetaClient.getStorageConf());
-      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = 
new ArrayList<>(columnRangeMetadataMap.values());
+      String partitionPath = 
FSUtils.getRelativePartitionPath(datasetMetaClient.getBasePath(), new 
StoragePath(filePath).getParent());
+      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
+          HoodieTableMetadataUtil.getLogFileColumnRangeMetadata(filePath, 
partitionPath, datasetMetaClient, columnsToIndex, writerSchemaOpt, 
maxBufferSize);
       return Option.of(getColStatsEntry(filePath, columnRangeMetadataList));
     } else {
       throw new HoodieException("Writer schema needs to be set");
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 3001cde81a6..8d7c9403d44 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -35,6 +36,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalTypes;
@@ -60,7 +62,14 @@ import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -75,7 +84,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-public class TestDataSourceUtils {
+public class TestDataSourceUtils extends HoodieClientTestBase {
 
   @Mock
   private SparkRDDWriteClient hoodieWriteClient;
@@ -300,4 +309,26 @@ public class TestDataSourceUtils {
 
     assertEquals(genericRecordHoodieMetadataPayload, 
deserGenericRecordHoodieMetadataPayload);
   }
+
+  @Test
+  void testDeduplicationAgainstRecordsAlreadyInTable() throws IOException {
+    initResources();
+    HoodieWriteConfig config = getConfig();
+    config.getProps().setProperty("path", config.getBasePath());
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      String newCommitTime = writeClient.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 2);
+      List<WriteStatus> statusList = writeClient.bulkInsert(recordsRDD, 
newCommitTime).collect();
+      writeClient.commit(newCommitTime, jsc.parallelize(statusList), 
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+      assertNoWriteErrors(statusList);
+
+      Map<String, String> parameters = 
config.getProps().entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().toString(), entry -> entry.getValue().toString()));
+      List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, 
10);
+      List<HoodieRecord> inputRecords = Stream.concat(records.subList(0, 
10).stream(), newRecords.stream()).collect(Collectors.toList());
+      List<HoodieRecord> output = DataSourceUtils.resolveDuplicates(jsc, 
jsc.parallelize(inputRecords, 1), parameters, false).collect();
+      Set<String> expectedRecordKeys = 
newRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
+      assertEquals(expectedRecordKeys, 
output.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet()));
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index b6c9af65b95..1b57a93eb6c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -26,7 +26,7 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieLogBlockMetadataScanner;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -61,9 +61,9 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
 import static 
org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -485,33 +485,25 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
     List<String> partitionPaths = 
Stream.of(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).collect(Collectors.toList());
     for (String partitionPath: partitionPaths) {
       fileSystemView.getLatestFileSlices(partitionPath).forEach(slice -> {
-        HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-            .withStorage(metaClient.getStorage())
-            .withBasePath(table.getMetaClient().getBasePath())
-            .withLogFilePaths(slice.getLogFiles()
+        HoodieLogBlockMetadataScanner scanner = new 
HoodieLogBlockMetadataScanner(
+            table.getMetaClient(),
+            slice.getLogFiles()
                 .sorted(HoodieLogFile.getLogFileComparator())
                 .map(file -> file.getPath().toString())
-                .collect(Collectors.toList()))
-            .withLatestInstantTime(instant)
-            .withBufferSize(config.getMaxDFSStreamBufferSize())
-            .withOptimizedLogBlocksScan(true)
-            .withTableMetaClient(metaClient)
-            .build();
-        scanner.scan(true);
+                .collect(Collectors.toList()),
+            config.getMaxDFSStreamBufferSize(),
+            instant,
+            Option.empty());
         List<String> prevInstants = scanner.getValidBlockInstants();
-        HoodieUnMergedLogRecordScanner scanner2 = 
HoodieUnMergedLogRecordScanner.newBuilder()
-            .withStorage(metaClient.getStorage())
-            .withBasePath(table.getMetaClient().getBasePath())
-            .withLogFilePaths(slice.getLogFiles()
+        HoodieLogBlockMetadataScanner scanner2 = new 
HoodieLogBlockMetadataScanner(
+            table.getMetaClient(),
+            slice.getLogFiles()
                 .sorted(HoodieLogFile.getLogFileComparator())
                 .map(file -> file.getPath().toString())
-                .collect(Collectors.toList()))
-            .withLatestInstantTime(currentInstant)
-            .withBufferSize(config.getMaxDFSStreamBufferSize())
-            .withOptimizedLogBlocksScan(true)
-            .withTableMetaClient(table.getMetaClient())
-            .build();
-        scanner2.scan(true);
+                .collect(Collectors.toList()),
+            config.getMaxDFSStreamBufferSize(),
+            currentInstant,
+            Option.empty());
         List<String> currentInstants = scanner2.getValidBlockInstants();
         assertEquals(prevInstants, currentInstants);
       });
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index b5c2f749e43..b9ecbd6ec5f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -24,23 +24,28 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -55,6 +60,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -73,7 +79,6 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords;
-import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeys;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getRevivedAndDeletedKeysFromMergedLogs;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.reduceByKeys;
 import static 
org.apache.hudi.metadata.SecondaryIndexKeyUtils.constructSecondaryIndexKey;
@@ -524,14 +529,15 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
     writeStatuses3.stream().filter(writeStatus -> 
FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), 
writeStatus.getPartitionPath())))
         .forEach(writeStatus -> {
           try {
-            StoragePath fullFilePath = new StoragePath(basePath, 
writeStatus.getStat().getPath());
+            HoodieWriteStat writeStat = writeStatus.getStat();
+            StoragePath fullFilePath = new StoragePath(basePath, 
writeStat.getPath());
             // used for RLI
             
finalActualDeletes.addAll(getRevivedAndDeletedKeysFromMergedLogs(metaClient, 
latestCommitTimestamp, EngineType.SPARK, 
Collections.singletonList(fullFilePath.toString()), writerSchemaOpt,
                 
Collections.singletonList(fullFilePath.toString())).getValue());
 
             // used in SI flow
-            
actualUpdatesAndDeletes.addAll(getRecordKeys(Collections.singletonList(fullFilePath.toString()),
 metaClient, writerSchemaOpt,
-                writeConfig.getMetadataConfig().getMaxReaderBufferSize(), 
latestCommitTimestamp, true, true));
+            
actualUpdatesAndDeletes.addAll(getRecordKeys(writeStat.getPartitionPath(), 
writeStat.getPrevCommit(), writeStat.getFileId(),
+                Collections.singletonList(fullFilePath), metaClient, 
writerSchemaOpt, latestCommitTimestamp));
           } catch (IOException e) {
             throw new HoodieIOException("Failed w/ IOException ", e);
           }
@@ -701,4 +707,36 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       }
     });
   }
+
+  Set<String> getRecordKeys(String partition, String baseInstantTime, String 
fileId, List<StoragePath> logFilePaths, HoodieTableMetaClient datasetMetaClient,
+                                   Option<Schema> writerSchemaOpt, String 
latestCommitTimestamp) throws IOException {
+    if (writerSchemaOpt.isPresent()) {
+      // read log file records without merging
+      FileSlice fileSlice = new FileSlice(partition, baseInstantTime, fileId);
+      logFilePaths.forEach(logFilePath -> {
+        HoodieLogFile logFile = new HoodieLogFile(logFilePath);
+        fileSlice.addLogFile(logFile);
+      });
+      TypedProperties properties = new TypedProperties();
+      // configure un-merged log file reader
+      HoodieReaderContext<InternalRow> readerContext = 
context.getReaderContextFactory(metaClient).getContext();
+      HoodieFileGroupReader<InternalRow> reader = 
HoodieFileGroupReader.<InternalRow>newBuilder()
+          .withReaderContext(readerContext)
+          .withDataSchema(writerSchemaOpt.get())
+          .withRequestedSchema(writerSchemaOpt.get())
+          .withEmitDelete(true)
+          .withFileSlice(fileSlice)
+          .withLatestCommitTime(latestCommitTimestamp)
+          .withHoodieTableMetaClient(datasetMetaClient)
+          .withProps(properties)
+          .withEmitDelete(true)
+          .build();
+      Set<String> allRecordKeys = new HashSet<>();
+      try (ClosableIterator<String> keysIterator = 
reader.getClosableKeyIterator()) {
+        keysIterator.forEachRemaining(allRecordKeys::add);
+      }
+      return allRecordKeys;
+    }
+    return Collections.emptySet();
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 17ac8a66070..1308687079a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1844,7 +1844,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             StoragePath storagePartitionPath = new 
StoragePath(metaClient.getBasePath(), partitionPath);
             String filePath = new StoragePath(storagePartitionPath, 
filename).toString();
             try {
-              return getLogFileColumnRangeMetadata(filePath, metaClient, 
allColumnNameList, Option.of(readerSchema),
+              return getLogFileColumnRangeMetadata(filePath, partitionPath, 
metaClient, allColumnNameList, Option.of(readerSchema),
                   metadataConfig.getMaxReaderBufferSize())
                   .stream()
                   // We need to convert file path and use only the file name 
instead of the complete file path

Reply via email to