This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 588e01114ce [HUDI-8371] Fix column stats index with MDT for few 
scenarios (#12105)
588e01114ce is described below

commit 588e01114ce3a1cad02c0d229e8919a29ee88770
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Oct 27 19:14:09 2024 -0700

    [HUDI-8371] Fix column stats index with MDT for few scenarios (#12105)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  34 +-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |   8 +
 .../table/log/HoodieUnMergedLogRecordScanner.java  |   2 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  39 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |  14 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 107 ++---
 .../org/apache/hudi/common/fs/TestFSUtils.java     |   4 +
 .../hudi/metadata/TestHoodieMetadataPayload.java   |  28 ++
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  12 +-
 .../hudi/testutils/LogFileColStatsTestUtil.java    |  96 +++++
 ...otstrap-rollback1-column-stats-index-table.json |   2 +
 .../cow-bootstrap1-column-stats-index-table.json   |   4 +
 .../cow-bootstrap2-column-stats-index-table.json   |   5 +
 .../cow-clean1-column-stats-index-table.json       |   2 +
 ...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json |   1 +
 ...otstrap-rollback1-column-stats-index-table.json |   2 +
 .../mor-bootstrap1-column-stats-index-table.json   |   3 +
 .../mor-bootstrap2-column-stats-index-table.json   |   5 +
 .../mor-clean1-column-stats-index-table.json       |   2 +
 ...mor-delete-block1-column-stats-index-table.json |   3 +
 ...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json |  10 +
 ...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json |   5 +
 ...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json |   5 +
 .../hudi/functional/ColumnStatIndexTestBase.scala  | 162 ++++++--
 .../hudi/functional/TestColumnStatsIndex.scala     | 451 ++++++++++++++++++++-
 .../functional/TestColumnStatsIndexWithSQL.scala   |  26 +-
 .../functional/TestSecondaryIndexPruning.scala     |  17 +-
 .../hudi/command/index/TestFunctionalIndex.scala   |   3 +-
 .../TestHoodieMetadataTableValidator.java          |   2 +-
 29 files changed, 896 insertions(+), 158 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5e244c2b121..0e8d5339cd2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -239,13 +239,13 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
                                        Option<String> 
inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
-    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.getValidValues().length);
+    List<MetadataPartitionType> metadataPartitionsToInit = new 
ArrayList<>(MetadataPartitionType.getValidValues().length);
 
     try {
       boolean exists = metadataTableExists(dataMetaClient);
       if (!exists) {
         // FILES partition is always required
-        partitionsToInit.add(FILES);
+        metadataPartitionsToInit.add(FILES);
       }
 
       // check if any of the enabled partition types needs to be initialized
@@ -255,10 +255,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         LOG.info("Async metadata indexing disabled and following partitions 
already initialized: {}", completedPartitions);
         this.enabledPartitionTypes.stream()
             .filter(p -> !completedPartitions.contains(p.getPartitionPath()) 
&& !FILES.equals(p))
-            .forEach(partitionsToInit::add);
+            .forEach(metadataPartitionsToInit::add);
       }
 
-      if (partitionsToInit.isEmpty()) {
+      if (metadataPartitionsToInit.isEmpty()) {
         // No partitions left to initialize, since all the metadata enabled 
partitions are either initialized before
         // or current in the process of initialization.
         initMetadataReader();
@@ -268,13 +268,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
       // Otherwise, we use the timestamp of the latest completed action.
       String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-      // Initialize partitions for the first time using data from the files on 
the file system
-      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
-        LOG.error("Failed to initialize MDT from filesystem");
-        return false;
-      }
-
+      initializeFromFilesystem(initializationTime, metadataPartitionsToInit, 
inflightInstantTimestamp);
       metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
       return true;
     } catch (IOException e) {
@@ -344,7 +338,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
+  private void initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
 
@@ -461,8 +455,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       }
 
       if (LOG.isInfoEnabled()) {
-        LOG.info("Initializing {} index with {} mappings and {} file groups.", 
partitionTypeName, fileGroupCountAndRecordsPair.getKey(),
-            fileGroupCountAndRecordsPair.getValue().count());
+        LOG.info("Initializing {} index with {} mappings", partitionTypeName, 
fileGroupCountAndRecordsPair.getKey());
       }
       HoodieTimer partitionInitTimer = HoodieTimer.start();
 
@@ -482,8 +475,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       long totalInitTime = partitionInitTimer.endTimer();
       LOG.info("Initializing {} index in metadata table took {} in ms", 
partitionTypeName, totalInitTime);
     }
-
-    return true;
   }
 
   /**
@@ -520,9 +511,11 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   }
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    // during initialization, we need stats for base and log files.
     HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
         engineContext, Collections.emptyMap(), partitionToFilesMap, 
dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-        dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
+        dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
+        dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());
 
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
     return Pair.of(fileGroupCount, records);
@@ -863,12 +856,13 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
   private List<DirectoryInfo> listAllPartitionsFromMDT(String 
initializationTime, Set<String> pendingDataInstants) throws IOException {
-    List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
+    List<String> allAbsolutePartitionPaths = 
metadata.getAllPartitionPaths().stream()
         .map(partitionPath -> dataWriteConfig.getBasePath() + 
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
-    Map<String, List<StoragePathInfo>> partitionFileMap = 
metadata.getAllFilesInPartitions(allPartitionPaths);
+    Map<String, List<StoragePathInfo>> partitionFileMap = 
metadata.getAllFilesInPartitions(allAbsolutePartitionPaths);
     List<DirectoryInfo> dirinfoList = new ArrayList<>(partitionFileMap.size());
     for (Map.Entry<String, List<StoragePathInfo>> entry : 
partitionFileMap.entrySet()) {
-      dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), 
initializationTime, pendingDataInstants));
+      String relativeDirPath = FSUtils.getRelativePartitionPath(new 
StoragePath(dataWriteConfig.getBasePath()), new StoragePath(entry.getKey()));
+      dirinfoList.add(new DirectoryInfo(relativeDirPath, entry.getValue(), 
initializationTime, pendingDataInstants, false));
     }
     return dirinfoList;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 65fb5073d7a..1537a758bd8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -154,6 +154,14 @@ public class FSUtils {
     return fullFileName.split("_", 2)[0];
   }
 
+  /**
+   * @param filePath
+   * @returns the filename from the given path. Path could be the absolute 
path or just partition path and file name.
+   */
+  public static String getFileNameFromPath(String filePath) {
+    return filePath.substring(filePath.lastIndexOf("/") + 1);
+  }
+
   /**
    * Gets all partition paths assuming date partitioning (year, month, day) 
three levels down.
    * TODO: (Lin) Delete this function after we remove the 
assume.date.partitioning config completely.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 1ce3dea58cb..61330466ac5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -83,7 +83,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
 
   @Override
   protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
-    throw new IllegalStateException("Not expected to see delete records in 
this log-scan mode. Check Job Config");
+    // no - op
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index d8fa78935f3..1abd7d05a13 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -840,12 +840,17 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
       }
 
       Set<String> keySet = new TreeSet<>(recordKeys);
+      Set<String> deletedRecordsFromLogs = new HashSet<>();
       Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
       logRecordScanner.getRecords().forEach(record -> {
         HoodieMetadataPayload payload = record.getData();
-        String recordKey = payload.getRecordKeyFromSecondaryIndex();
-        if (keySet.contains(recordKey)) {
-          logRecordsMap.put(recordKey, record);
+        if (!payload.isDeleted()) { // process only valid records.
+          String recordKey = payload.getRecordKeyFromSecondaryIndex();
+          if (keySet.contains(recordKey)) {
+            logRecordsMap.put(recordKey, record);
+          }
+        } else {
+          deletedRecordsFromLogs.add(record.getRecordKey());
         }
       });
 
@@ -856,7 +861,11 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = 
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
         return mergedRecord.orElseGet(null);
       }));
-      baseFileRecords.forEach((key, value) -> recordKeyMap.put(key, 
value.getRecordKey()));
+      baseFileRecords.forEach((key, value) -> {
+        if (!deletedRecordsFromLogs.contains(key)) {
+          recordKeyMap.put(key, value.getRecordKey());
+        }
+      });
     } catch (IOException ioe) {
       throw new HoodieIOException("Error merging records from metadata table 
for  " + recordKeys.size() + " key : ", ioe);
     } finally {
@@ -931,17 +940,22 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
       List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
       secondaryKeySet.addAll(sortedSecondaryKeys);
       Collections.sort(sortedSecondaryKeys);
+      Set<String> deletedRecordKeysFromLogs = new HashSet<>();
 
       logRecordScanner.getRecords().forEach(record -> {
         HoodieMetadataPayload payload = record.getData();
-        String secondaryKey = payload.key;
-        if (secondaryKeySet.contains(secondaryKey)) {
-          String recordKey = payload.getRecordKeyFromSecondaryIndex();
-          logRecordsMap.computeIfAbsent(secondaryKey, k -> new 
HashMap<>()).put(recordKey, record);
+        if (!payload.isDeleted()) {
+          String secondaryKey = payload.key;
+          if (secondaryKeySet.contains(secondaryKey)) {
+            String recordKey = payload.getRecordKeyFromSecondaryIndex();
+            logRecordsMap.computeIfAbsent(secondaryKey, k -> new 
HashMap<>()).put(recordKey, record);
+          }
+        } else {
+          deletedRecordKeysFromLogs.add(record.getRecordKey());
         }
       });
 
-      return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, 
sortedSecondaryKeys, logRecordsMap, timings, partitionName);
+      return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, 
sortedSecondaryKeys, logRecordsMap, timings, partitionName, 
deletedRecordKeysFromLogs);
     } catch (IOException ioe) {
       throw new HoodieIOException("Error merging records from metadata table 
for  " + secondaryKeys.size() + " key : ", ioe);
     } finally {
@@ -955,7 +969,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
                                                                                
                             List<String> sortedKeys,
                                                                                
                             Map<String, HashMap<String, HoodieRecord>> 
logRecordsMap,
                                                                                
                             List<Long> timings,
-                                                                               
                             String partitionName) throws IOException {
+                                                                               
                             String partitionName,
+                                                                               
                             Set<String> deleteRecordKeysFromLogs) throws 
IOException {
     HoodieTimer timer = HoodieTimer.start();
 
     Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new 
HashMap<>();
@@ -978,9 +993,13 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     if (logRecordsMap.isEmpty() && !baseFileRecordsMap.isEmpty()) {
       // file slice has only base file
       timings.add(timer.endTimer());
+      if (!deleteRecordKeysFromLogs.isEmpty()) { // remove deleted records 
from log from base file record list
+        deleteRecordKeysFromLogs.forEach(key -> 
baseFileRecordsMap.remove(key));
+      }
       return baseFileRecordsMap;
     }
 
+    // check why we are not considering records missing from logs, but only 
from base file.
     logRecordsMap.forEach((secondaryKey, logRecords) -> {
       if (!baseFileRecordsMap.containsKey(secondaryKey)) {
         List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 4300811be08..aff3721b745 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -206,23 +206,23 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   }
 
   protected HoodieMetadataPayload(String key, int type, Map<String, 
HoodieMetadataFileInfo> filesystemMetadata) {
-    this(key, type, filesystemMetadata, null, null, null, null);
+    this(key, type, filesystemMetadata, null, null, null, null, false);
   }
 
   protected HoodieMetadataPayload(String key, HoodieMetadataBloomFilter 
metadataBloomFilter) {
-    this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, 
metadataBloomFilter, null, null, null);
+    this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, 
metadataBloomFilter, null, null, null, metadataBloomFilter.getIsDeleted());
   }
 
   protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats 
columnStats, int recordType) {
-    this(key, recordType, null, null, columnStats, null, null);
+    this(key, recordType, null, null, columnStats, null, null, 
columnStats.getIsDeleted());
   }
 
   private HoodieMetadataPayload(String key, HoodieRecordIndexInfo 
recordIndexMetadata) {
-    this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null, 
null, recordIndexMetadata, null);
+    this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null, 
null, recordIndexMetadata, null, false);
   }
 
   private HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo 
secondaryIndexMetadata) {
-    this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null, 
null, null, null, secondaryIndexMetadata);
+    this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null, 
null, null, null, secondaryIndexMetadata, 
secondaryIndexMetadata.getIsDeleted());
   }
 
   protected HoodieMetadataPayload(String key, int type,
@@ -230,7 +230,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
                                   HoodieMetadataBloomFilter 
metadataBloomFilter,
                                   HoodieMetadataColumnStats columnStats,
                                   HoodieRecordIndexInfo recordIndexMetadata,
-                                  HoodieSecondaryIndexInfo 
secondaryIndexMetadata) {
+                                  HoodieSecondaryIndexInfo 
secondaryIndexMetadata,
+                                  boolean isDeletedRecord) {
     this.key = key;
     this.type = type;
     this.filesystemMetadata = filesystemMetadata;
@@ -238,6 +239,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     this.columnStatMetadata = columnStats;
     this.recordIndexMetadata = recordIndexMetadata;
     this.secondaryIndexMetadata = secondaryIndexMetadata;
+    this.isDeletedRecord = isDeletedRecord;
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cee1e8e9cab..765d819b09a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -75,7 +75,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ExternalFilePathUtil;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -142,10 +141,10 @@ import static 
org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -751,12 +750,8 @@ public class HoodieTableMetadataUtil {
     return engineContext.parallelize(deleteFileList, parallelism)
         .flatMap(deleteFileInfoPair -> {
           String partitionPath = deleteFileInfoPair.getLeft();
-          String filePath = deleteFileInfoPair.getRight();
-
-          if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension()) 
|| ExternalFilePathUtil.isExternallyCreatedFile(filePath)) {
-            return getColumnStatsRecords(partitionPath, filePath, 
dataMetaClient, columnsToIndex, true).iterator();
-          }
-          return Collections.emptyListIterator();
+          String fileName = deleteFileInfoPair.getRight();
+          return getColumnStatsRecords(partitionPath, fileName, 
dataMetaClient, columnsToIndex, true).iterator();
         });
   }
 
@@ -953,7 +948,8 @@ public class HoodieTableMetadataUtil {
                                                                           
HoodieTableMetaClient dataMetaClient,
                                                                           
boolean isColumnStatsIndexEnabled,
                                                                           int 
columnStatsIndexParallelism,
-                                                                          
List<String> targetColumnsForColumnStatsIndex) {
+                                                                          
List<String> targetColumnsForColumnStatsIndex,
+                                                                          int 
maxReaderBufferSize) {
     // Find the columns to index
     final List<String> columnsToIndex =
         getColumnsToIndex(isColumnStatsIndexEnabled, 
targetColumnsForColumnStatsIndex,
@@ -972,16 +968,10 @@ public class HoodieTableMetadataUtil {
     // Create records MDT
     int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
columnStatsIndexParallelism), 1);
     return engineContext.parallelize(partitionFileFlagTupleList, 
parallelism).flatMap(partitionFileFlagTuple -> {
-      final String partitionName = partitionFileFlagTuple.f0;
+      final String partitionPath = partitionFileFlagTuple.f0;
       final String filename = partitionFileFlagTuple.f1;
       final boolean isDeleted = partitionFileFlagTuple.f2;
-      if (!FSUtils.isBaseFile(new StoragePath(filename)) || 
!filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        LOG.warn("Ignoring file {} as it is not a PARQUET file", filename);
-        return Stream.<HoodieRecord>empty().iterator();
-      }
-
-      final String filePathWithPartition = partitionName + "/" + filename;
-      return getColumnStatsRecords(partitionName, filePathWithPartition, 
dataMetaClient, columnsToIndex, isDeleted).iterator();
+      return getColumnStatsRecords(partitionPath, filename, dataMetaClient, 
columnsToIndex, isDeleted, maxReaderBufferSize).iterator();
     });
   }
 
@@ -1221,54 +1211,62 @@ public class HoodieTableMetadataUtil {
       return 
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), 
columnRangeMetadataList, false);
     }
 
-    return getColumnStatsRecords(writeStat.getPartitionPath(), 
writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
+    String filePath = writeStat.getPath();
+    return getColumnStatsRecords(writeStat.getPartitionPath(), 
getFileNameFromPath(filePath), datasetMetaClient, columnsToIndex, false);
   }
 
   private static Stream<HoodieRecord> getColumnStatsRecords(String 
partitionPath,
-                                                            String filePath,
+                                                            String fileName,
                                                             
HoodieTableMetaClient datasetMetaClient,
                                                             List<String> 
columnsToIndex,
                                                             boolean isDeleted) 
{
-    String filePartitionPath = filePath.startsWith("/") ? 
filePath.substring(1) : filePath;
-    String fileName = 
filePartitionPath.substring(filePartitionPath.lastIndexOf("/") + 1);
+    return getColumnStatsRecords(partitionPath, fileName, datasetMetaClient, 
columnsToIndex, isDeleted, -1);
+  }
+
+  private static Stream<HoodieRecord> getColumnStatsRecords(String 
partitionPath,
+                                                            String fileName,
+                                                            
HoodieTableMetaClient datasetMetaClient,
+                                                            List<String> 
columnsToIndex,
+                                                            boolean isDeleted,
+                                                            int maxBufferSize) 
{
 
     if (isDeleted) {
-      // TODO we should delete records instead of stubbing them
       List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = 
columnsToIndex.stream()
           .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
           .collect(Collectors.toList());
 
       return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
columnRangeMetadataList, true);
     }
-
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
-        readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, 
columnsToIndex, false, Option.empty());
+        readColumnRangeMetadataFrom(partitionPath, fileName, 
datasetMetaClient, columnsToIndex, maxBufferSize);
 
     return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
columnRangeMetadata, false);
   }
 
-  private static List<HoodieColumnRangeMetadata<Comparable>> 
readColumnRangeMetadataFrom(String filePath,
+  private static List<HoodieColumnRangeMetadata<Comparable>> 
readColumnRangeMetadataFrom(String partitionPath,
+                                                                               
          String fileName,
                                                                                
          HoodieTableMetaClient datasetMetaClient,
                                                                                
          List<String> columnsToIndex,
-                                                                               
          boolean shouldReadColumnStatsForLogFiles,
-                                                                               
          Option<Schema> writerSchemaOpt) {
+                                                                               
          int maxBufferSize) {
+    String partitionPathFileName = (partitionPath.equals(EMPTY_PARTITION_NAME) 
|| partitionPath.equals(NON_PARTITIONED_NAME)) ? fileName
+        : partitionPath + "/" + fileName;
     try {
-      StoragePath fullFilePath = new 
StoragePath(datasetMetaClient.getBasePath(), filePath);
-      if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+      StoragePath fullFilePath = new 
StoragePath(datasetMetaClient.getBasePath(), partitionPathFileName);
+      if 
(partitionPathFileName.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
         return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
             .getFileFormatUtils(HoodieFileFormat.PARQUET)
             .readColumnStatsFromMetadata(datasetMetaClient.getStorage(), 
fullFilePath, columnsToIndex);
-      } else if (FSUtils.isLogFile(fullFilePath) && 
shouldReadColumnStatsForLogFiles) {
-        LOG.warn("Reading log file: {}, to build column range metadata.", 
fullFilePath);
-        return getLogFileColumnRangeMetadata(fullFilePath.toString(), 
datasetMetaClient, columnsToIndex, writerSchemaOpt);
+      } else if (FSUtils.isLogFile(fileName)) {
+        Option<Schema> writerSchemaOpt = 
tryResolveSchemaForTable(datasetMetaClient);
+        LOG.warn("Reading log file: {}, to build column range metadata.", 
partitionPathFileName);
+        return getLogFileColumnRangeMetadata(fullFilePath.toString(), 
datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize);
       }
-
-      LOG.warn("Column range index not supported for: {}", filePath);
+      LOG.warn("Column range index not supported for: {}", 
partitionPathFileName);
       return Collections.emptyList();
     } catch (Exception e) {
       // NOTE: In case reading column range metadata from individual file 
failed,
       //       we simply fall back, in lieu of failing the whole task
-      LOG.error("Failed to fetch column range metadata for: {}", filePath);
+      LOG.error("Failed to fetch column range metadata for: {}", 
partitionPathFileName);
       return Collections.emptyList();
     }
   }
@@ -1280,7 +1278,8 @@ public class HoodieTableMetadataUtil {
   protected static List<HoodieColumnRangeMetadata<Comparable>> 
getLogFileColumnRangeMetadata(String filePath,
                                                                                
              HoodieTableMetaClient datasetMetaClient,
                                                                                
              List<String> columnsToIndex,
-                                                                               
              Option<Schema> writerSchemaOpt) {
+                                                                               
              Option<Schema> writerSchemaOpt,
+                                                                               
              int maxBufferSize) throws IOException {
     if (writerSchemaOpt.isPresent()) {
       List<Schema.Field> fieldsToIndex = 
writerSchemaOpt.get().getFields().stream()
           .filter(field -> columnsToIndex.contains(field.name()))
@@ -1291,15 +1290,18 @@ public class HoodieTableMetadataUtil {
           .withStorage(datasetMetaClient.getStorage())
           .withBasePath(datasetMetaClient.getBasePath())
           .withLogFilePaths(Collections.singletonList(filePath))
-          .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+          .withBufferSize(maxBufferSize)
           
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp())
           .withReaderSchema(writerSchemaOpt.get())
           .withTableMetaClient(datasetMetaClient)
           .withLogRecordScannerCallback(records::add)
           .build();
-      scanner.scan(false);
+      scanner.scan();
+      if (records.isEmpty()) {
+        return Collections.emptyList();
+      }
       Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
-          collectColumnRangeMetadata(records, fieldsToIndex, filePath, 
writerSchemaOpt.get());
+          collectColumnRangeMetadata(records, fieldsToIndex, 
getFileNameFromPath(filePath), writerSchemaOpt.get());
       return new ArrayList<>(columnRangeMetadataMap.values());
     }
     return Collections.emptyList();
@@ -2138,12 +2140,12 @@ public class HoodieTableMetadataUtil {
     LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndex);
     // Create records for MDT
     int parallelism = Math.max(Math.min(partitionInfoList.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
-    Option<Schema> writerSchema = lazyWriterSchemaOpt.get();
     return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionInfo -> {
       final String partitionPath = partitionInfo.getRelativePath();
       // Step 1: Collect Column Metadata for Each File
       List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionInfo.getFileNameToSizeMap().keySet().stream()
-          .map(fileName -> getFileStatsRangeMetadata(partitionPath, 
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false, 
true, writerSchemaOpt))
+          .map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName, 
dataTableMetaClient, columnsToIndex, false,
+              metadataConfig.getMaxReaderBufferSize()))
           .collect(Collectors.toList());
 
       return collectAndProcessColumnMetadata(fileColumnMetadata, 
partitionPath, true).iterator();
@@ -2151,20 +2153,17 @@ public class HoodieTableMetadataUtil {
   }
 
   private static List<HoodieColumnRangeMetadata<Comparable>> 
getFileStatsRangeMetadata(String partitionPath,
-                                                                               
        String filePath,
+                                                                               
        String fileName,
                                                                                
        HoodieTableMetaClient datasetMetaClient,
                                                                                
        List<String> columnsToIndex,
                                                                                
        boolean isDeleted,
-                                                                               
        boolean shouldReadColumnMetadataForLogFiles,
-                                                                               
        Option<Schema> writerSchemaOpt) {
-    String filePartitionPath = filePath.startsWith("/") ? 
filePath.substring(1) : filePath;
-    String fileName = FSUtils.getFileName(filePath, partitionPath);
+                                                                               
        int maxBufferSize) {
     if (isDeleted) {
       return columnsToIndex.stream()
           .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
           .collect(Collectors.toList());
     }
-    return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, 
columnsToIndex, shouldReadColumnMetadataForLogFiles, writerSchemaOpt);
+    return readColumnRangeMetadataFrom(partitionPath, fileName, 
datasetMetaClient, columnsToIndex, maxBufferSize);
   }
 
   public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,
@@ -2296,7 +2295,8 @@ public class HoodieTableMetadataUtil {
       return columnRangeMap.values().stream().collect(Collectors.toList());
     }
 
-    return getFileStatsRangeMetadata(writeStat.getPartitionPath(), 
writeStat.getPath(), datasetMetaClient, columnsToIndex, false, false, 
writerSchemaOpt);
+    String filePath = writeStat.getPath();
+    return getFileStatsRangeMetadata(writeStat.getPartitionPath(), 
getFileNameFromPath(filePath), datasetMetaClient, columnsToIndex, false, -1);
   }
 
   public static String getPartitionStatsIndexKey(String partitionPath, String 
columnName) {
@@ -2434,13 +2434,22 @@ public class HoodieTableMetadataUtil {
     private boolean isHoodiePartition = false;
 
     public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, 
String maxInstantTime, Set<String> pendingDataInstants) {
+      this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true);
+    }
+
+    /**
+     * When files are directly fetched from Metadata table we do not need to 
validate HoodiePartitions.
+     */
+    public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, 
String maxInstantTime, Set<String> pendingDataInstants,
+                         boolean validateHoodiePartitions) {
       this.relativePath = relativePath;
 
       // Pre-allocate with the maximum length possible
       filenameToSizeMap = new HashMap<>(pathInfos.size());
 
       // Presence of partition meta file implies this is a HUDI partition
-      isHoodiePartition = pathInfos.stream().anyMatch(status -> 
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
+      // if input files are directly fetched from MDT, it may not contain the 
HoodiePartitionMetadata file. So, we can ignore the validation for 
isHoodiePartition.
+      isHoodiePartition = !validateHoodiePartitions || 
pathInfos.stream().anyMatch(status -> 
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
       for (StoragePathInfo pathInfo : pathInfos) {
         // Do not attempt to search for more subdirectories inside directories 
that are partitions
         if (!isHoodiePartition && pathInfo.isDirectory()) {
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 7621aca7c7f..ad5b1230bde 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -256,6 +256,10 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(rlPath));
     assertEquals(0, FSUtils.getStageIdFromLogPath(rlPath));
     assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath));
+
+    assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/path/" + logFile));
+    assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/abc/def/path/" + 
logFile));
+    assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/" + logFile));
   }
 
   @Test
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
index 7fcc0d16193..ce2cae78342 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
@@ -29,18 +29,23 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests {@link HoodieMetadataPayload}.
  */
 public class TestHoodieMetadataPayload extends HoodieCommonTestHarness {
   public static final String PARTITION_NAME = "2022/10/01";
+  public static final String PARTITION_NAME2 = "2023/10/01";
+  public static final String PARTITION_NAME3 = "2024/10/01";
 
   @Test
   public void testFileSystemMetadataPayloadMerging() {
@@ -148,6 +153,27 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
         ).getData(),
         
deletionRecord2.getData().preCombine(deletionRecord1.getData().preCombine(additionRecord.getData()))
     );
+
+    // lets delete all files
+    List<String> allDeletedFileList = new ArrayList<>();
+    allDeletedFileList.add("file1.parquet");
+    allDeletedFileList.add("file2.parquet");
+    allDeletedFileList.add("file3.parquet");
+    allDeletedFileList.add("file4.parquet");
+    HoodieRecord<HoodieMetadataPayload> allDeletionRecord =
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, 
Collections.emptyMap(), allDeletedFileList);
+
+    HoodieMetadataPayload combinedPayload = 
allDeletionRecord.getData().preCombine(additionRecord.getData());
+    
assertEquals(HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, 
Collections.emptyMap(), Collections.emptyList()).getData(), combinedPayload);
+    assertTrue(combinedPayload.filesystemMetadata.isEmpty());
+
+    // test all partition record
+    HoodieRecord<HoodieMetadataPayload> allPartitionsRecord = 
HoodieMetadataPayload.createPartitionListRecord(Arrays.asList(PARTITION_NAME, 
PARTITION_NAME2, PARTITION_NAME3), false);
+    HoodieRecord<HoodieMetadataPayload> partitionDeletedRecord = 
HoodieMetadataPayload.createPartitionListRecord(Collections.singletonList(PARTITION_NAME),
 true);
+    // combine to ensure the deleted partitions is not seen
+    HoodieMetadataPayload payload = 
partitionDeletedRecord.getData().preCombine(allPartitionsRecord.getData());
+    
assertEquals(HoodieMetadataPayload.createPartitionListRecord(Arrays.asList(PARTITION_NAME2,
 PARTITION_NAME3), false).getData(),
+        payload);
   }
 
   @Test
@@ -211,6 +237,8 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
         
deletedColumnStatsRecord.getData().preCombine(columnStatsRecord.getData());
 
     assertEquals(deletedColumnStatsRecord.getData(), 
deletedCombinedMetadataPayload);
+    
assertFalse(deletedCombinedMetadataPayload.getInsertValue(null).isPresent());
+    assertTrue(deletedCombinedMetadataPayload.isDeleted());
 
     // NOTE: In this case, proper incoming record will be overwriting 
previously deleted
     //       record
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 4ccc48b519d..9586171d97a 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -26,8 +26,10 @@ import 
org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.FileCreateUtils;
@@ -206,7 +208,12 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
   public void testGetLogFileColumnRangeMetadata() throws Exception {
     HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getStorageConf());
     String instant1 = "20230918120000000";
-    hoodieTestTable = hoodieTestTable.addCommit(instant1);
+
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS.toString());
+    hoodieTestTable = hoodieTestTable.addCommit(instant1, 
Option.of(commitMetadata));
     String instant2 = "20230918121110000";
     hoodieTestTable = hoodieTestTable.addCommit(instant2);
     List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new 
ArrayList<>();
@@ -243,7 +250,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
             storagePath2.toString(),
             metaClient,
             columnsToIndex,
-            
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+            
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+            HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue());
         // there must be two ranges for rider and driver
         assertEquals(2, columnRangeMetadataLogFile.size());
       } catch (Exception e) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
new file mode 100644
index 00000000000..464ad5ddca1
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
+
+/**
+ * Util methods used in tests to fetch col stats records for a log file.
+ */
+public class LogFileColStatsTestUtil {
+
+  public static Option<Row> getLogFileColumnRangeMetadata(String filePath, 
HoodieTableMetaClient datasetMetaClient, String latestCommitTime,
+                                                  List<String> columnsToIndex, 
Option<Schema> writerSchemaOpt,
+                                                  int maxBufferSize) throws 
IOException {
+    if (writerSchemaOpt.isPresent()) {
+      List<Schema.Field> fieldsToIndex = 
writerSchemaOpt.get().getFields().stream()
+          .filter(field -> columnsToIndex.contains(field.name()))
+          .collect(Collectors.toList());
+      List<HoodieRecord> records = new ArrayList<>();
+      HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
+          .withStorage(datasetMetaClient.getStorage())
+          .withBasePath(datasetMetaClient.getBasePath())
+          .withLogFilePaths(Collections.singletonList(filePath))
+          .withBufferSize(maxBufferSize)
+          .withLatestInstantTime(latestCommitTime)
+          .withReaderSchema(writerSchemaOpt.get())
+          .withLogRecordScannerCallback(records::add)
+          .build();
+      scanner.scan();
+      if (records.isEmpty()) {
+        return Option.empty();
+      }
+      Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
+          collectColumnRangeMetadata(records, fieldsToIndex, filePath, 
writerSchemaOpt.get());
+      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = 
new ArrayList<>(columnRangeMetadataMap.values());
+      return Option.of(getColStatsEntry(filePath, columnRangeMetadataList));
+    } else {
+      throw new HoodieException("Writer schema needs to be set");
+    }
+  }
+
+  private static Row getColStatsEntry(String logFilePath, 
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList) {
+    Collections.sort(columnRangeMetadataList, (o1, o2) -> 
o1.getColumnName().compareTo(o2.getColumnName()));
+    Object[] values = new Object[(columnRangeMetadataList.size() * 3) + 2];
+    values[0] = logFilePath.substring(logFilePath.lastIndexOf("/") + 1);
+    values[1] = columnRangeMetadataList.get(0).getValueCount();
+    int counter = 2;
+    for (HoodieColumnRangeMetadata columnRangeMetadata: 
columnRangeMetadataList) {
+      values[counter++] = columnRangeMetadata.getValueCount();
+      values[counter++] = columnRangeMetadata.getMinValue();
+      values[counter++] = columnRangeMetadata.getMaxValue();
+    }
+    return new GenericRow(values);
+  }
+
+  public static Option<Schema> getSchemaForTable(HoodieTableMetaClient 
metaClient) throws Exception {
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+    return Option.of(schemaResolver.getTableAvroSchema());
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json
new file mode 100644
index 00000000000..83790766db2
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
984sdh","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minV
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json
new file mode 100644
index 00000000000..75aa7ada3ad
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json
@@ -0,0 +1,4 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minV
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_mi
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json
new file mode 100644
index 00000000000..9c52707a27d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json
@@ -0,0 +1,5 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minV
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_mi
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_m
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json
new file mode 100644
index 00000000000..a08dea39c05
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_m
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..17e8f877c50
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1 @@
+{"c1":633,"c2":" 
987sdk","c3":375.308,"c4":"2021-11-18T23:34:44.180-08:00","c5":0,"c6":"2020-01-01","c7":"NA==","c8":9}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json
new file mode 100644
index 00000000000..dcbf49b141f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
984sdh","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json
new file mode 100644
index 00000000000..146097347e0
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json
@@ -0,0 +1,3 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
984sdh","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
 [...]
+{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json
new file mode 100644
index 00000000000..6256be16c1d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json
@@ -0,0 +1,5 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
984sdh","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
 [...]
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
984sdh","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qQ==","c7_nullCount":0,"c8_maxValue":9,"c8
 [...]
+{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_m
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json
new file mode 100644
index 00000000000..8c7b1125314
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
984sdh","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json
new file mode 100644
index 00000000000..fc6c936c787
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json
@@ -0,0 +1,3 @@
+{"c1_nullCount":0,"c2_nullCount":0,"c3_nullCount":0,"c4_nullCount":0,"c5_nullCount":0,"c6_nullCount":0,"c7_nullCount":0,"c8_nullCount":0,"valueCount":0}
+{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
989sda","c2_minValue":" 
980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8
 [...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
 [...]
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..35ae749ddc3
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1,10 @@
+{"c1":323,"c2":" 
980sdd","c3":null,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Ag==","c8":9}
+{"c1":326,"c2":" 
981sde","c3":64.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AA==","c8":9}
+{"c1":555,"c2":" 
982sdf","c3":153.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rw==","c8":9}
+{"c1":556,"c2":" 
983sdg","c3":246.427,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qw==","c8":9}
+{"c1":562,"c2":" 
984sdh","c3":977.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SA==","c8":9}
+{"c1":619,"c2":" 
985sdi","c3":230.320,"c4":"2021-11-19T23:34:44.180-08:00","c5":1000,"c6":"2020-02-13","c7":"QA==","c8":9}
+{"c1":624,"c2":" 
986sdj","c3":580.317,"c4":"2021-11-18T23:34:44.180-08:00","c5":-1,"c6":"2020-10-10","c7":"PQ==","c8":9}
+{"c1":633,"c2":" 
987sdk","c3":375.308,"c4":"2021-11-18T23:34:44.180-08:00","c5":-1000,"c6":"2020-01-01","c7":"NA==","c8":9}
+{"c1":638,"c2":" 
988sdl","c3":904.304,"c4":"2021-11-18T23:34:44.179-08:00","c5":20,"c6":"2020-08-25","c7":"MA==","c8":9}
+{"c1":639,"c2":" 
989sda","c3":0.300,"c4":"2021-11-18T23:34:44.179-08:00","c5":90,"c6":"2020-04-21","c7":"aa==","c8":9}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..5e04406cf21
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1,5 @@
+{"c1":323,"c2":" 
980sdd","c3":10.00,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Ag==","c8":9}
+{"c1":326,"c2":" 
981sde","c3":10000.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AA==","c8":9}
+{"c1":555,"c2":" 
982sdf","c3":2.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rw==","c8":9}
+{"c1":556,"c2":" 
983sdg","c3":0.001,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qw==","c8":9}
+{"c1":562,"c2":" 
984sdh","c3":5.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SA==","c8":9}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..a83a82d8b8b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1,5 @@
+{"c1":323,"c2":" 
980sdd","c3":200000.00,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Aj==","c8":9}
+{"c1":326,"c2":" 
981sde","c3":100.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AB==","c8":9}
+{"c1":555,"c2":" 
982sdf","c3":20.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rx==","c8":9}
+{"c1":556,"c2":" 
983sdg","c3":0.1,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qf==","c8":9}
+{"c1":562,"c2":" 
984sdh","c3":4.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SL==","c8":9}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index 779abafb2da..8839f6d55d2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -18,26 +18,37 @@
 
 package org.apache.hudi.functional
 
+
+import org.apache.avro.Schema
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig}
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.{HoodieBaseFile, HoodieFileGroup, 
HoodieLogFile, HoodieTableType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.FileSystemViewManager
+import org.apache.hudi.config.HoodieCompactionConfig
 import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
 import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
-
 import org.apache.spark.sql._
+import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
+import org.apache.hudi.testutils.{HoodieSparkClientTestBase, 
LogFileColStatsTestUtil}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
 import org.apache.spark.sql.functions.typedLit
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.DataFrame
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api._
 import org.junit.jupiter.params.provider.Arguments
 
 import java.math.BigInteger
 import java.sql.{Date, Timestamp}
-
+import java.util
+import java.util.List
+import java.util.stream.Collectors
 import scala.collection.JavaConverters._
 import scala.util.Random
 
@@ -76,42 +87,39 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
     cleanupSparkContexts()
   }
 
-  protected def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase,
-                                            metadataOpts: Map[String, String],
-                                            hudiOpts: Map[String, String],
-                                            dataSourcePath: String,
-                                            expectedColStatsSourcePath: String,
-                                            operation: String,
-                                            saveMode: SaveMode,
-                                            shouldValidate: Boolean = true): 
Unit = {
-    val sourceJSONTablePath = 
getClass.getClassLoader.getResource(dataSourcePath).toString
+  protected def doWriteAndValidateColumnStats(params: ColumnStatsTestParams): 
Unit = {
+
+    val sourceJSONTablePath = 
getClass.getClassLoader.getResource(params.dataSourcePath).toString
 
     // NOTE: Schema here is provided for validation that the input date is in 
the appropriate format
     val inputDF = 
spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
 
+    val writeOptions: Map[String, String] = params.hudiOpts ++ 
params.metadataOpts
+
     inputDF
       .sort("c1")
-      .repartition(4, new Column("c1"))
+      .repartition(params.numPartitions, new Column("c1"))
       .write
       .format("hudi")
-      .options(hudiOpts)
-      .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
-      .option(DataSourceWriteOptions.OPERATION.key, operation)
-      .mode(saveMode)
+      .options(writeOptions)
+      .option(DataSourceWriteOptions.OPERATION.key, params.operation)
+      .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 
String.valueOf(params.parquetMaxFileSize))
+      .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 
String.valueOf(params.smallFileLimit))
+      .mode(params.saveMode)
       .save(basePath)
     dfList = dfList :+ inputDF
 
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
-    if (shouldValidate) {
+    if (params.shouldValidate) {
       // Currently, routine manually validating the column stats (by actually 
reading every column of every file)
       // only supports parquet files. Therefore we skip such validation when 
delta-log files are present, and only
       // validate in following cases: (1) COW: all operations; (2) MOR: insert 
only.
-      val shouldValidateColumnStatsManually = testCase.tableType == 
HoodieTableType.COPY_ON_WRITE ||
-        operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      val shouldValidateColumnStatsManually = params.testCase.tableType == 
HoodieTableType.COPY_ON_WRITE ||
+        
params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
 
       validateColumnStatsIndex(
-        testCase, metadataOpts, expectedColStatsSourcePath, 
shouldValidateColumnStatsManually)
+        params.testCase, params.metadataOpts, 
params.expectedColStatsSourcePath, shouldValidateColumnStatsManually, 
params.latestCompletedCommit)
     }
   }
 
@@ -119,16 +127,19 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
                                             includedCols: Seq[String],
                                             indexedCols: Seq[String],
                                             indexSchema: StructType): 
DataFrame = {
-    val files = {
-      val pathInfoList = storage.listFiles(new StoragePath(tablePath))
-      pathInfoList.asScala.filter(fs => 
fs.getPath.getName.endsWith(".parquet"))
-    }
-
-    spark.createDataFrame(
-      files.flatMap(file => {
-        val df = 
spark.read.schema(sourceTableSchema).parquet(file.getPath.toString)
+    val metaClient = HoodieTableMetaClient.builder().setConf(new 
HadoopStorageConfiguration(jsc.hadoopConfiguration())).setBasePath(tablePath).build()
+    val fsv = FileSystemViewManager.createInMemoryFileSystemView(new 
HoodieSparkEngineContext(jsc), metaClient, 
HoodieMetadataConfig.newBuilder().enable(false).build())
+    fsv.loadAllPartitions()
+    val filegroupList = 
fsv.getAllFileGroups.collect(Collectors.toList[HoodieFileGroup])
+    val baseFilesList = filegroupList.stream().flatMap(fileGroup => 
fileGroup.getAllBaseFiles).collect(Collectors.toList[HoodieBaseFile])
+    val baseFiles = baseFilesList.stream()
+      .map[StoragePath](baseFile => 
baseFile.getStoragePath).collect(Collectors.toList[StoragePath]).asScala
+
+    val baseFilesDf = spark.createDataFrame(
+      baseFiles.flatMap(file => {
+        val df = spark.read.schema(sourceTableSchema).parquet(file.toString)
         val exprs: Seq[String] =
-          s"'${typedLit(file.getPath.getName)}' AS file" +:
+          s"'${typedLit(file.getName)}' AS file" +:
             s"sum(1) AS valueCount" +:
             df.columns
               .filter(col => includedCols.contains(col))
@@ -156,12 +167,61 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
       }).asJava,
       indexSchema
     )
+
+    if (metaClient.getTableConfig.getTableType == 
HoodieTableType.COPY_ON_WRITE) {
+      baseFilesDf // COW table
+    } else {
+      val allLogFiles = filegroupList.stream().flatMap(fileGroup => 
fileGroup.getAllFileSlices)
+        .flatMap(fileSlice => fileSlice.getLogFiles)
+        .collect(Collectors.toList[HoodieLogFile])
+      if (allLogFiles.isEmpty) {
+        baseFilesDf // MOR table, but no log files.
+      } else {
+        val colsToGenerateStats = indexedCols // check for included cols
+        val writerSchemaOpt = 
LogFileColStatsTestUtil.getSchemaForTable(metaClient)
+        val latestCompletedCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+        baseFilesDf.union(getColStatsFromLogFiles(allLogFiles, 
latestCompletedCommit,
+          scala.collection.JavaConverters.seqAsJavaList(colsToGenerateStats),
+          metaClient,
+          writerSchemaOpt: org.apache.hudi.common.util.Option[Schema],
+          HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue(),
+          indexSchema))
+      }
+    }
+  }
+
+  protected def getColStatsFromLogFiles(logFiles: List[HoodieLogFile], 
latestCommit: String, columnsToIndex: util.List[String],
+                                        datasetMetaClient: 
HoodieTableMetaClient,
+                                        writerSchemaOpt: 
org.apache.hudi.common.util.Option[Schema],
+                                        maxBufferSize: Integer,
+                                        indexSchema: StructType): DataFrame = {
+    val colStatsEntries = 
logFiles.stream().map[org.apache.hudi.common.util.Option[Row]](logFile => {
+      try {
+        getColStatsFromLogFile(logFile.getPath.toString, latestCommit, 
columnsToIndex, datasetMetaClient, writerSchemaOpt, maxBufferSize)
+      } catch {
+        case e: Exception =>
+          throw e
+      }
+    }).filter(rowOpt => rowOpt.isPresent).map[Row](rowOpt => 
rowOpt.get()).collect(Collectors.toList[Row])
+    spark.createDataFrame(colStatsEntries, indexSchema)
+  }
+
+  protected def getColStatsFromLogFile(logFilePath: String,
+                                       latestCommit: String,
+                                       columnsToIndex: util.List[String],
+                                       datasetMetaClient: 
HoodieTableMetaClient,
+                                       writerSchemaOpt: 
org.apache.hudi.common.util.Option[Schema],
+                                       maxBufferSize: Integer
+                                      ): 
org.apache.hudi.common.util.Option[Row] = {
+    LogFileColStatsTestUtil.getLogFileColumnRangeMetadata(logFilePath, 
datasetMetaClient, latestCommit,
+      columnsToIndex, writerSchemaOpt, maxBufferSize)
   }
 
   protected def validateColumnStatsIndex(testCase: ColumnStatsTestCase,
-                                       metadataOpts: Map[String, String],
-                                       expectedColStatsSourcePath: String,
-                                       validateColumnStatsManually: Boolean): 
Unit = {
+                                         metadataOpts: Map[String, String],
+                                         expectedColStatsSourcePath: String,
+                                         validateColumnStatsManually: Boolean,
+                                         latestCompletedCommit: String): Unit 
= {
     val metadataConfig = HoodieMetadataConfig.newBuilder()
       .fromProperties(toProperties(metadataOpts))
       .build()
@@ -177,7 +237,8 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
       }
     }
     val (expectedColStatsSchema, _) = 
composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, 
sourceTableSchema)
-    val validationSortColumns = Seq("c1_maxValue", "c1_minValue", 
"c2_maxValue", "c2_minValue")
+    val validationSortColumns = Seq("c1_maxValue", "c1_minValue", 
"c2_maxValue", "c2_minValue", "c3_maxValue",
+      "c3_minValue", "c5_maxValue", "c5_minValue")
 
     columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, 
testCase.shouldReadInMemory) { transposedColStatsDF =>
       // Match against expected column stats table
@@ -269,14 +330,41 @@ object ColumnStatIndexTestBase {
   def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] = 
{
     
java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType 
=>
       Seq(Arguments.arguments(ColumnStatsTestCase(tableType, 
shouldReadInMemory = true)),
-        Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory 
= false)))
+        Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory 
= false))
+      )
     ): _*)
   }
 
   def testMetadataColumnStatsIndexParamsForMOR: 
java.util.stream.Stream[Arguments] = {
     java.util.stream.Stream.of(
       
Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = true)),
-        Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = false)))
-    : _*)
+        Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = false))
+      )
+        : _*)
   }
+
+  def testTableTypePartitionTypeParams: java.util.stream.Stream[Arguments] = {
+    java.util.stream.Stream.of(
+      Seq(
+        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8"),
+        // empty partition col represents non-partitioned table.
+        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, ""),
+        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8"),
+        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "")
+      )
+        : _*)
+  }
+
+  case class ColumnStatsTestParams(testCase: ColumnStatsTestCase,
+                                   metadataOpts: Map[String, String],
+                                   hudiOpts: Map[String, String],
+                                   dataSourcePath: String,
+                                   expectedColStatsSourcePath: String,
+                                   operation: String,
+                                   saveMode: SaveMode,
+                                   shouldValidate: Boolean = true,
+                                   latestCompletedCommit: String = null,
+                                   numPartitions: Integer = 4,
+                                   parquetMaxFileSize: Integer = 10 * 1024,
+                                   smallFileLimit: Integer = 100 * 1024 * 1024)
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index d4190957913..9323b9ce2ed 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.functional
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
PRECOMBINE_FIELD, RECORDKEY_FIELD}
 import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -26,14 +28,21 @@ import org.apache.hudi.common.model.{HoodieBaseFile, 
HoodieFileGroup, HoodieTabl
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.ParquetUtils
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
 import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.view.FileSystemViewManager
+import org.apache.hudi.common.util.{ParquetUtils, StringUtils}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
+import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions, 
config}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
GreaterThan, Literal, Or}
@@ -70,17 +79,17 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
     ) ++ metadataOpts
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/input-table-json",
       expectedColStatsSourcePath = 
"index/colstats/column-stats-index-table.json",
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
-      saveMode = SaveMode.Overwrite)
+      saveMode = SaveMode.Overwrite))
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/another-input-table-json",
       expectedColStatsSourcePath = 
"index/colstats/updated-column-stats-index-table.json",
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
-      saveMode = SaveMode.Append)
+      saveMode = SaveMode.Append))
 
     // NOTE: MOR and COW have different fixtures since MOR is bearing 
delta-log files (holding
     //       deferred updates), diverging from COW
@@ -90,13 +99,441 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase 
{
       "index/colstats/mor-updated2-column-stats-index-table.json"
     }
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/update-input-table-json",
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
-      saveMode = SaveMode.Append)
+      saveMode = SaveMode.Append))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testTableTypePartitionTypeParams"))
+  def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: 
HoodieTableType, partitionCol : String): Unit = {
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      PARTITIONPATH_FIELD.key() -> partitionCol,
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+    ) ++ metadataOpts
+
+    // inserts
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/input-table-json",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      false,
+      numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    // updates
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/update2-input-table-json/",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      false,
+        numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    // delete a subset of recs. this will add a delete log block for MOR table.
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/delete-input-table-json/",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      false,
+      numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    val metadataOpts1 = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
+    // NOTE: MOR and COW have different fixtures since MOR is bearing 
delta-log files (holding
+    //       deferred updates), diverging from COW
+
+    val expectedColStatsSourcePath = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
+      "index/colstats/cow-bootstrap1-column-stats-index-table.json"
+    } else {
+      "index/colstats/mor-bootstrap1-column-stats-index-table.json"
+    }
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val latestCompletedCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+    // lets validate that we have log files generated in case of MOR table
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      val metaClient = HoodieTableMetaClient.builder().setConf(new 
HadoopStorageConfiguration(jsc.hadoopConfiguration())).setBasePath(basePath).build()
+      val fsv = FileSystemViewManager.createInMemoryFileSystemView(new 
HoodieSparkEngineContext(jsc), metaClient, 
HoodieMetadataConfig.newBuilder().enable(false).build())
+      fsv.loadAllPartitions()
+      val baseStoragePath = new StoragePath(basePath)
+      val allPartitionPaths = fsv.getPartitionPaths
+      allPartitionPaths.forEach(partitionPath => {
+        val pPath = FSUtils.getRelativePartitionPath(baseStoragePath, 
partitionPath)
+        assertTrue (fsv.getLatestFileSlices(pPath).filter(fileSlice => 
fileSlice.hasLogFiles).count() > 0)
+      })
+      fsv.close()
+    }
+
+    // updates a subset which are not deleted and enable col stats and 
validate bootstrap
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts1, commonOpts,
+      dataSourcePath = "index/colstats/update3-input-table-json",
+      expectedColStatsSourcePath = expectedColStatsSourcePath,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      true,
+      latestCompletedCommit,
+      numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    // trigger one more upsert and compaction (w/ MOR table) and validate.
+    val expectedColStatsSourcePath1 = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
+      "index/colstats/cow-bootstrap2-column-stats-index-table.json"
+    } else {
+      "index/colstats/mor-bootstrap2-column-stats-index-table.json"
+    }
+
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts1, commonOpts,
+      dataSourcePath = "index/colstats/update4-input-table-json",
+      expectedColStatsSourcePath = expectedColStatsSourcePath1,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      true,
+      latestCompletedCommit,
+      numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+  }
+
+  @ParameterizedTest
+  @MethodSource(Array("testTableTypePartitionTypeParams"))
+  def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType: 
HoodieTableType, partitionCol : String): Unit = {
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      PARTITIONPATH_FIELD.key() -> partitionCol,
+      "hoodie.write.markers.type" -> "DIRECT",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+    ) ++ metadataOpts
+
+    // inserts
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/input-table-json",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      false,
+      numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    // updates
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/update2-input-table-json/",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      false,
+      numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    simulateFailureForLatestCommit(tableType, partitionCol)
+
+    val metadataOpts1 = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
+    // NOTE: MOR and COW have different fixtures since MOR is bearing 
delta-log files (holding
+    //       deferred updates), diverging from COW
+
+    val expectedColStatsSourcePath = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
+      "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json"
+    } else {
+      "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+    }
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val latestCompletedCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+    // updates a subset which are not deleted and enable col stats and 
validate bootstrap
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts1, commonOpts,
+      dataSourcePath = "index/colstats/update3-input-table-json",
+      expectedColStatsSourcePath = expectedColStatsSourcePath,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      true,
+      latestCompletedCommit,
+      numPartitions =  1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
+  }
+
+  def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol: 
String) : Unit = {
+    // simulate failure for latest commit.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    var baseFileName : String = null
+    var logFileName : String = null
+    val lastCompletedCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+        metaClient.getStorage.listFiles(new 
StoragePath(metaClient.getBasePath, "/"))
+      } else {
+        metaClient.getStorage.listFiles(new 
StoragePath(metaClient.getBasePath, "9"))
+      }
+      val logFileFileStatus = dataFiles.stream().filter(fileStatus => 
fileStatus.getPath.getName.contains(".log")).findFirst().get()
+      logFileName = logFileFileStatus.getPath.getName
+    } else {
+      val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+        metaClient.getStorage.listFiles(new 
StoragePath(metaClient.getBasePath.toString))
+      } else {
+        metaClient.getStorage.listFiles(new 
StoragePath(metaClient.getBasePath,  "9"))
+      }
+      val baseFileFileStatus = dataFiles.stream().filter(fileStatus => 
fileStatus.getPath.getName.contains(lastCompletedCommit.getTimestamp)).findFirst().get()
+      baseFileName = baseFileFileStatus.getPath.getName
+    }
+
+    val latestCompletedFileName = lastCompletedCommit.getFileName
+    metaClient.getStorage.deleteFile(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/" + 
latestCompletedFileName))
+
+    // re-create marker for the deleted file.
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      if (StringUtils.isNullOrEmpty(partitionCol)) {
+        metaClient.getStorage.create(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" + 
lastCompletedCommit.getTimestamp + "/" + logFileName + ".marker.APPEND"))
+      } else {
+        metaClient.getStorage.create(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" + 
lastCompletedCommit.getTimestamp + "/9/" + logFileName + ".marker.APPEND"))
+      }
+    } else {
+      if (StringUtils.isNullOrEmpty(partitionCol)) {
+        metaClient.getStorage.create(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" + 
lastCompletedCommit.getTimestamp + "/" + baseFileName + ".marker.MERGE"))
+      } else {
+        metaClient.getStorage.create(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" + 
lastCompletedCommit.getTimestamp + "/9/" + baseFileName + ".marker.MERGE"))
+      }
+    }
+  }
+
+  @Test
+  def testMORDeleteBlocks(): Unit = {
+    val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
+    val partitionCol = "c8"
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      PARTITIONPATH_FIELD.key() -> partitionCol,
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+    ) ++ metadataOpts
+
+    // inserts
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/input-table-json",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      false,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    // updates
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/update2-input-table-json/",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      false,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    val expectedColStatsSourcePath = 
"index/colstats/mor-delete-block1-column-stats-index-table.json"
+
+    // delete a subset of recs. this will add a delete log block for MOR table.
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/delete-input-table-json/",
+      expectedColStatsSourcePath = expectedColStatsSourcePath,
+      operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      true,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("", "c8"))
+  def testColStatsWithCleanCOW(partitionCol: String): Unit = {
+    val tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      PARTITIONPATH_FIELD.key() -> partitionCol,
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1"
+    ) ++ metadataOpts
+
+    // inserts
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/input-table-json",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      false,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    val metadataOpts1 = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
+    // updates 1
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts1, commonOpts,
+      dataSourcePath = "index/colstats/update2-input-table-json/",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      false,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    val expectedColStatsSourcePath = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
+      "index/colstats/cow-clean1-column-stats-index-table.json"
+    } else {
+      "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+    }
+
+    // updates 2
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts1, commonOpts,
+      dataSourcePath = "index/colstats/update3-input-table-json/",
+      expectedColStatsSourcePath = expectedColStatsSourcePath,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      true,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("", "c8"))
+  def testColStatsWithCleanMOR(partitionCol: String): Unit = {
+    val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      PARTITIONPATH_FIELD.key() -> partitionCol,
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1",
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2"
+    ) ++ metadataOpts
+
+    // inserts
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
+      dataSourcePath = "index/colstats/input-table-json",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      false,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    val metadataOpts1 = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
+    // updates 1
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts1, commonOpts,
+      dataSourcePath = "index/colstats/update2-input-table-json/",
+      expectedColStatsSourcePath = null,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      false,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    val expectedColStatsSourcePath = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
+      "index/colstats/cow-clean1-column-stats-index-table.json"
+    } else {
+      "index/colstats/mor-clean1-column-stats-index-table.json"
+    }
+
+    // updates 2
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts1, commonOpts,
+      dataSourcePath = "index/colstats/update3-input-table-json/",
+      expectedColStatsSourcePath = expectedColStatsSourcePath,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      true,
+      numPartitions = 1,
+      parquetMaxFileSize = 100 * 1024 * 1024,
+      smallFileLimit = 0))
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() 
> 0)
+  }
 
   @ParameterizedTest
   @EnumSource(classOf[HoodieTableType])
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 1c731a6d0a6..effda5af016 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, 
HoodieTableType, Writ
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.timeline.{HoodieInstant, 
MetadataConversionUtils}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
-import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
+import 
org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, 
ColumnStatsTestParams}
 import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView
 import org.apache.hudi.util.JavaConversions
@@ -88,12 +88,12 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
       HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name()
     ) ++ metadataOpts
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/input-table-json",
       expectedColStatsSourcePath = 
"index/colstats/column-stats-index-table.json",
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite,
-      shouldValidate = false)
+      shouldValidate = false))
 
     assertEquals(4, getLatestDataFilesCount(commonOpts))
     assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = 
false))
@@ -133,12 +133,12 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
     verifyFileIndexAndSQLQueries(commonOpts, 
isTableDataSameAsAfterSecondInstant = true)
 
     // Add the last df back and verify the queries
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/update-input-table-json",
       expectedColStatsSourcePath = "",
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      shouldValidate = false)
+      shouldValidate = false))
     verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false)
   }
 
@@ -195,27 +195,27 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
     writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty())
     writeClient.close()
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/update-input-table-json",
       expectedColStatsSourcePath = "",
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      shouldValidate = false)
+      shouldValidate = false))
     verifyFileIndexAndSQLQueries(commonOpts)
   }
 
   private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: 
Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean): 
Unit = {
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/input-table-json",
       expectedColStatsSourcePath = 
"index/colstats/column-stats-index-table.json",
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
-      saveMode = SaveMode.Overwrite)
+      saveMode = SaveMode.Overwrite))
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/another-input-table-json",
       expectedColStatsSourcePath = 
"index/colstats/updated-column-stats-index-table.json",
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
-      saveMode = SaveMode.Append)
+      saveMode = SaveMode.Append))
 
     // NOTE: MOR and COW have different fixtures since MOR is bearing 
delta-log files (holding
     //       deferred updates), diverging from COW
@@ -225,12 +225,12 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
       "index/colstats/mor-updated2-column-stats-index-table.json"
     }
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/update-input-table-json",
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      shouldValidate)
+      shouldValidate))
   }
 
   def verifyFileIndexAndSQLQueries(opts: Map[String, String], 
isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean 
= true): Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index c04dc09a69c..216d2b71d32 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -319,7 +319,6 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row1'")
       // validate the secondary index records themselves
       checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, 
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
-        Seq("abc", "row1", true),
         Seq("cde", "row2", false),
         Seq("def", "row3", false),
         Seq("xyz", "row1", false)
@@ -328,7 +327,7 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
         Seq(1, "row1", "xyz", "p1")
       )
-      verifyQueryPredicate(hudiOpts, "not_record_key_col")
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
     }
   }
 
@@ -516,8 +515,6 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
            |FROM hudi_metadata('$basePath')
            |WHERE type=7
        """.stripMargin)(
-        Seq("abc", "row1", true),
-        Seq("cde", "row2", true),
         Seq("value1_1", "row1", false),
         Seq("value2_2", "row2", false)
       )
@@ -605,7 +602,6 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row1'")
       // validate the secondary index records themselves
       checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, 
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
-        Seq("abc", "row1", true),
         Seq("cde", "row2", false),
         Seq("def", "row3", false),
         Seq("xyz", "row1", false)
@@ -614,7 +610,7 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
         Seq(1, "row1", "xyz", "p1")
       )
-      verifyQueryPredicate(hudiOpts, "not_record_key_col")
+      verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
     }
   }
 
@@ -770,11 +766,11 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
       // however index definition should still be present
       assertTrue(metaClient.getIndexMetadata.isPresent && 
metaClient.getIndexMetadata.get.getIndexDefinitions.get(secondaryIndexPartition).getIndexType.equals("secondary_index"))
+
       // update the secondary key column
       spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row1'")
       // validate the secondary index records themselves
       checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, 
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
-        Seq("abc", "row1", true),
         Seq("xyz", "row1", false)
       )
     }
@@ -784,10 +780,11 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
   }
 
-  private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: 
String): Unit = {
+  private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: 
String, nonExistantKey: String = "abcdefghi"): Unit = {
     mergedDfList = mergedDfList :+ 
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
-    val secondaryKey = mergedDfList.last.limit(1).collect().map(row => 
row.getAs(columnName).toString)
-    val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey(0)))
+    val secondaryKey = mergedDfList.last.limit(2).collect().filter(row => 
!row.getAs(columnName).toString.equals(nonExistantKey))
+      .map(row => row.getAs(columnName).toString).head
+    val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey))
     verifyFilePruning(hudiOpts, dataFilter)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 5a302304277..c4b76a6de88 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -634,8 +634,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
 
             // verify there are new updates to functional index with isDeleted 
true for cleaned file
             checkAnswer(s"select ColumnStatsMetadata.minValue.member6.value, 
ColumnStatsMetadata.maxValue.member6.value, ColumnStatsMetadata.isDeleted from 
hudi_metadata('$tableName') where type=3 and 
ColumnStatsMetadata.fileName='$fileName'")(
-              Seq("2022-09-26", "2022-09-26", false),
-              Seq(null, null, true) // for the cleaned file
+              Seq("2022-09-26", "2022-09-26", false) // for cleaned file, 
there won't be any stats produced.
             )
           }
         }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 455fd4b38f2..2763077a887 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -217,7 +217,7 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
 
     Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
     rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
-    rows = getRowDataset(2, "row2", "abc", "p2");
+    rows = getRowDataset(2, "row2", "ghi", "p2");
     rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
     rows = getRowDataset(3, "row3", "def", "p2");
     rows.write().format("hudi").mode(SaveMode.Append).save(basePath);

Reply via email to