This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b4e1e5eba1c [HUDI-8208] Fix partition stats bound when compacting or 
clustering (#12050)
b4e1e5eba1c is described below

commit b4e1e5eba1c7a5b03dff7d4bad5878901b79e3b0
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Oct 15 02:43:19 2024 +0530

    [HUDI-8208] Fix partition stats bound when compacting or clustering (#12050)
    
    The [min, max] range in column stats or partition stats can keep widening 
with udpates or deletes, because we simply take min of all mins' and max of 
maxs' while merging the stats. This can lead to a degenerative case where all 
partitions qualify for a predicate based on stats, even though actually very 
few partitions meet the predicate based on actual data. It defeats the purpose 
of pruning/skipping using stats. To fix this problem, we need to bring the 
range to a tighter bound. In o [...]
    
    - Adds a flag in column stats metadata payload - isTightBound - to indicate 
whether min/max range is a tighter bound based on latest snapshot or not. It is 
false by default and set to true during compaction or clustering.
    - Adds a config to disable calculating tight bounds. Enabled by default for 
compaction and clustering.
    - To calculate tight bound, we look at the colstats partition for the 
uncompacted or unclustered files and then merge the colstats with that of the 
compacted or clustered files. Most of the changes are in 
HoodieTableMetadataUtil.
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  18 ++-
 hudi-common/src/main/avro/HoodieMetadata.avsc      |  11 ++
 .../hudi/common/config/HoodieMetadataConfig.java   |  18 +++
 .../common/model/HoodieColumnRangeMetadata.java    |  18 +++
 .../hudi/common/model/WriteOperationType.java      |   4 +
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   2 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |   3 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 162 ++++++++++++++++-----
 .../apache/hudi/utils/TestAvroSchemaConverter.java |   3 +-
 .../common/functional/TestHoodieLogFormat.java     |  86 +----------
 .../common/testutils/HoodieCommonTestHarness.java  | 126 ++++++++++++++++
 .../hudi/metadata/TestHoodieMetadataPayload.java   |  12 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java | 110 +++++++++++---
 .../TestPartitionStatsIndexWithSql.scala           |  79 ++++++----
 .../utilities/HoodieMetadataTableValidator.java    |   5 +-
 15 files changed, 468 insertions(+), 189 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f72e47ad98f..cc101ed6ade 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -490,7 +490,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   }
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) {
-    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, 
partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient);
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, 
partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient,
+        Option.of(new 
Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
     return Pair.of(fileGroupCount, records);
   }
@@ -590,13 +591,14 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   }
 
   private List<Pair<String, FileSlice>> getPartitionFileSlicePairs() throws 
IOException {
-    HoodieMetadataFileSystemView fsView = getMetadataView();
-    // Collect the list of latest file slices present in each partition
-    List<String> partitions = metadata.getAllPartitionPaths();
-    fsView.loadAllPartitions();
-    List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
-    partitions.forEach(partition -> 
fsView.getLatestFileSlices(partition).forEach(fs -> 
partitionFileSlicePairs.add(Pair.of(partition, fs))));
-    return partitionFileSlicePairs;
+    try (HoodieMetadataFileSystemView fsView = getMetadataView()) {
+      // Collect the list of latest file slices present in each partition
+      List<String> partitions = metadata.getAllPartitionPaths();
+      fsView.loadAllPartitions();
+      List<Pair<String, FileSlice>> partitionFileSlicePairs = new 
ArrayList<>();
+      partitions.forEach(partition -> 
fsView.getLatestFileSlices(partition).forEach(fs -> 
partitionFileSlicePairs.add(Pair.of(partition, fs))));
+      return partitionFileSlicePairs;
+    }
   }
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc 
b/hudi-common/src/main/avro/HoodieMetadata.avsc
index e7a5a1e145d..f092c3b2a63 100644
--- a/hudi-common/src/main/avro/HoodieMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieMetadata.avsc
@@ -353,6 +353,17 @@
                             "doc": "Column range entry valid/deleted flag",
                             "name": "isDeleted",
                             "type": "boolean"
+                        },
+                        // NOTE: This is a new field added in 1.0.0.
+                        //       Typically, the min/max range for each column 
can become wider i.e. the minValue <= all
+                        //       valid values and the maxValue >= all valid 
values in the file with updates and deletes.
+                        //       For effective pruning, the min/max range can 
be updated to a tighter bound during
+                        //       compaction or clustering.
+                        {
+                            "doc": "Whether the min/max range of a column is 
tight bound or not",
+                            "name": "isTightBound",
+                            "type": "boolean",
+                            "default": false
                         }
                     ]
                 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 736b21e847a..c84723db8ed 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -358,6 +358,15 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Parallelism to use, when generating partition stats 
index.");
 
+  public static final ConfigProperty<Boolean> 
ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND = ConfigProperty
+      .key(METADATA_PREFIX + ".index.partition.stats.tightBound.enable")
+      .defaultValue(true)
+      .sinceVersion("1.0.0")
+      .withDocumentation("Enable tight bound for the min/max value for each 
column at the storage partition level. "
+          + "Typically, the min/max range for each column can become wider 
(i.e. the minValue is <= all valid values "
+          + "in the file, and the maxValue >= all valid values in the file) 
with updates and deletes. If this config is enabled, "
+          + "the min/max range will be updated to the tight bound of the valid 
values in the latest snapshot of the partition.");
+
   public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = 
ConfigProperty
       .key(METADATA_PREFIX + ".index.secondary.enable")
       .defaultValue(false)
@@ -525,6 +534,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getInt(PARTITION_STATS_INDEX_PARALLELISM);
   }
 
+  public boolean isPartitionStatsIndexTightBoundEnabled() {
+    return getBooleanOrDefault(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND);
+  }
+
   public boolean isSecondaryIndexEnabled() {
     // Secondary index is enabled only iff record index (primary key index) is 
also enabled
     return isRecordIndexEnabled() && getBoolean(SECONDARY_INDEX_ENABLE_PROP);
@@ -736,6 +749,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withPartitionStatsIndexTightBound(boolean enable) {
+      metadataConfig.setValue(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND, 
String.valueOf(enable));
+      return this;
+    }
+
     public HoodieMetadataConfig build() {
       metadataConfig.setDefaultValue(ENABLE, 
getDefaultMetadataEnable(engineType));
       metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index 1c39acb3c8c..50d2a03c025 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.util.ValidationUtils;
 
 import javax.annotation.Nullable;
@@ -25,6 +26,8 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.Objects;
 
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+
 /**
  * Hoodie metadata for the column range of data stored in columnar format 
(like Parquet)
  *
@@ -146,6 +149,21 @@ public class HoodieColumnRangeMetadata<T extends 
Comparable> implements Serializ
     return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue, 
maxValue, nullCount, valueCount, totalSize, totalUncompressedSize);
   }
 
+  /**
+   * Converts instance of {@link HoodieMetadataColumnStats} to {@link 
HoodieColumnRangeMetadata}
+   */
+  public static HoodieColumnRangeMetadata<Comparable> 
fromColumnStats(HoodieMetadataColumnStats columnStats) {
+    return HoodieColumnRangeMetadata.<Comparable>create(
+        columnStats.getFileName(),
+        columnStats.getColumnName(),
+        unwrapAvroValueWrapper(columnStats.getMinValue()),
+        unwrapAvroValueWrapper(columnStats.getMaxValue()),
+        columnStats.getNullCount(),
+        columnStats.getValueCount(),
+        columnStats.getTotalSize(),
+        columnStats.getTotalUncompressedSize());
+  }
+
   @SuppressWarnings("rawtype")
   public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
                                                            String columnName) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index e69b4036a1a..d41e2d2aee8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -168,4 +168,8 @@ public enum WriteOperationType {
   public static boolean isPreppedWriteOperation(WriteOperationType 
operationType) {
     return operationType == BULK_INSERT_PREPPED || operationType == 
INSERT_PREPPED | operationType == UPSERT_PREPPED || operationType == 
DELETE_PREPPED;
   }
+
+  public static boolean isPartitionStatsTightBoundRequired(WriteOperationType 
operationType) {
+    return operationType == COMPACT || operationType == CLUSTER;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 51466d12474..1f1c7f170d1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -214,7 +214,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     checkState(!partitionFileSlices.isEmpty(), "Number of file slices for 
partition " + partitionName + " should be > 0");
 
     return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
-        engineContext.parallelize(partitionFileSlices))
+        getEngineContext().parallelize(partitionFileSlices))
         .flatMap(
             (SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
               // NOTE: Since this will be executed by executors, we can't 
access previously cached
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 7e32acd0038..6035dbcbd49 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -519,7 +519,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
 
   public static Stream<HoodieRecord> createPartitionStatsRecords(String 
partitionPath,
                                                                  
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
-                                                                 boolean 
isDeleted) {
+                                                                 boolean 
isDeleted, boolean isTightBound) {
     return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
       HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath, 
columnRangeMetadata.getColumnName()),
           MetadataPartitionType.PARTITION_STATS.getPartitionPath());
@@ -535,6 +535,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
               .setTotalSize(columnRangeMetadata.getTotalSize())
               
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
               .setIsDeleted(isDeleted)
+              .setIsTightBound(isTightBound)
               .build(),
           MetadataPartitionType.PARTITION_STATS.getRecordType());
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 562504e56bb..286187a38ed 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.FloatWrapper;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -59,11 +60,13 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieFileSliceReader;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -77,6 +80,7 @@ import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Tuple3;
@@ -129,7 +133,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.stream.Collectors.toList;
 import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
@@ -138,6 +141,7 @@ import static 
org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
@@ -271,21 +275,6 @@ public class HoodieTableMetadataUtil {
         Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, 
Function.identity()));
   }
 
-  /**
-   * Converts instance of {@link HoodieMetadataColumnStats} to {@link 
HoodieColumnRangeMetadata}
-   */
-  public static HoodieColumnRangeMetadata<Comparable> 
convertColumnStatsRecordToColumnRangeMetadata(HoodieMetadataColumnStats 
columnStats) {
-    return HoodieColumnRangeMetadata.<Comparable>create(
-        columnStats.getFileName(),
-        columnStats.getColumnName(),
-        unwrapAvroValueWrapper(columnStats.getMinValue()),
-        unwrapAvroValueWrapper(columnStats.getMaxValue()),
-        columnStats.getNullCount(),
-        columnStats.getValueCount(),
-        columnStats.getTotalSize(),
-        columnStats.getTotalUncompressedSize());
-  }
-
   public static Option<String> getColumnStatsValueAsString(Object statsValue) {
     if (statsValue == null) {
       LOG.info("Invalid column stats value: {}", statsValue);
@@ -1215,20 +1204,25 @@ public class HoodieTableMetadataUtil {
     }
 
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
-        readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, 
columnsToIndex);
+        readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, 
columnsToIndex, false, Option.empty());
 
     return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
columnRangeMetadata, false);
   }
 
   private static List<HoodieColumnRangeMetadata<Comparable>> 
readColumnRangeMetadataFrom(String filePath,
                                                                                
          HoodieTableMetaClient datasetMetaClient,
-                                                                               
          List<String> columnsToIndex) {
+                                                                               
          List<String> columnsToIndex,
+                                                                               
          boolean shouldReadColumnStatsForLogFiles,
+                                                                               
          Option<Schema> writerSchemaOpt) {
     try {
+      StoragePath fullFilePath = new 
StoragePath(datasetMetaClient.getBasePath(), filePath);
       if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        StoragePath fullFilePath = new 
StoragePath(datasetMetaClient.getBasePath(), filePath);
         return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
             .getFileFormatUtils(HoodieFileFormat.PARQUET)
             .readColumnStatsFromMetadata(datasetMetaClient.getStorage(), 
fullFilePath, columnsToIndex);
+      } else if (FSUtils.isLogFile(fullFilePath) && 
shouldReadColumnStatsForLogFiles) {
+        LOG.warn("Reading log file: {}, to build column range metadata.", 
fullFilePath);
+        return getLogFileColumnRangeMetadata(fullFilePath.toString(), 
datasetMetaClient, columnsToIndex, writerSchemaOpt);
       }
 
       LOG.warn("Column range index not supported for: {}", filePath);
@@ -1241,9 +1235,41 @@ public class HoodieTableMetadataUtil {
     }
   }
 
+  /**
+   * Read column range metadata from log file.
+   */
+  @VisibleForTesting
+  protected static List<HoodieColumnRangeMetadata<Comparable>> 
getLogFileColumnRangeMetadata(String filePath,
+                                                                               
              HoodieTableMetaClient datasetMetaClient,
+                                                                               
              List<String> columnsToIndex,
+                                                                               
              Option<Schema> writerSchemaOpt) {
+    if (writerSchemaOpt.isPresent()) {
+      List<Schema.Field> fieldsToIndex = 
writerSchemaOpt.get().getFields().stream()
+          .filter(field -> columnsToIndex.contains(field.name()))
+          .collect(Collectors.toList());
+      // read log file records without merging
+      List<HoodieRecord> records = new ArrayList<>();
+      HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
+          .withStorage(datasetMetaClient.getStorage())
+          .withBasePath(datasetMetaClient.getBasePath())
+          .withLogFilePaths(Collections.singletonList(filePath))
+          .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+          
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp())
+          .withReaderSchema(writerSchemaOpt.get())
+          .withTableMetaClient(datasetMetaClient)
+          .withLogRecordScannerCallback(records::add)
+          .build();
+      scanner.scan(false);
+      Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
+          collectColumnRangeMetadata(records, fieldsToIndex, filePath, 
writerSchemaOpt.get());
+      return new ArrayList<>(columnRangeMetadataMap.values());
+    }
+    return Collections.emptyList();
+  }
+
   /**
    * Does an upcast for {@link BigDecimal} instance to align it with 
scale/precision expected by
-   * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type
+   * the {@link LogicalTypes.Decimal} Avro logical type
    */
   public static BigDecimal tryUpcastDecimal(BigDecimal value, final 
LogicalTypes.Decimal decimal) {
     final int scale = decimal.getScale();
@@ -1810,7 +1836,7 @@ public class HoodieTableMetadataUtil {
       final FileSlice fileSlice = partitionAndBaseFile.getValue();
       if (!fileSlice.getBaseFile().isPresent()) {
         List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-            .map(l -> l.getPath().toString()).collect(toList());
+            .map(l -> l.getPath().toString()).collect(Collectors.toList());
         HoodieMergedLogRecordScanner mergedLogRecordScanner = 
HoodieMergedLogRecordScanner.newBuilder()
             .withStorage(metaClient.getStorage())
             .withBasePath(basePath)
@@ -1921,7 +1947,7 @@ public class HoodieTableMetadataUtil {
     return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
       final String partition = partitionAndBaseFile.getKey();
       final FileSlice fileSlice = partitionAndBaseFile.getValue();
-      List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l -> 
l.getPath().toString()).collect(toList());
+      List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l -> 
l.getPath().toString()).collect(Collectors.toList());
       Option<StoragePath> dataFilePath = 
Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> filePath(basePath, 
partition, baseFile.getFileName())).orElseGet(null));
       Schema readerSchema;
       if (dataFilePath.isPresent()) {
@@ -2042,7 +2068,7 @@ public class HoodieTableMetadataUtil {
 
   private static Stream<HoodieRecord> collectAndProcessColumnMetadata(
           List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata,
-          String partitionPath) {
+          String partitionPath, boolean isTightBound) {
 
     // Step 1: Flatten and Group by Column Name
     Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap 
= fileColumnMetadata.stream()
@@ -2054,17 +2080,19 @@ public class HoodieTableMetadataUtil {
             .map(entry -> 
FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue()));
 
     // Create Partition Stats Records
-    return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, 
partitionStatsRangeMetadata.collect(Collectors.toList()), false);
+    return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, 
partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound);
   }
 
   public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
                                                                              
List<DirectoryInfo> partitionInfoList,
                                                                              
HoodieMetadataConfig metadataConfig,
-                                                                             
HoodieTableMetaClient dataTableMetaClient) {
+                                                                             
HoodieTableMetaClient dataTableMetaClient,
+                                                                             
Option<Schema> writerSchemaOpt) {
+    Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ? 
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() -> 
tryResolveSchemaForTable(dataTableMetaClient));
     final List<String> columnsToIndex = getColumnsToIndex(
         metadataConfig.isPartitionStatsIndexEnabled(),
         metadataConfig.getColumnsEnabledForColumnStatsIndex(),
-        Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+        lazyWriterSchemaOpt);
     if (columnsToIndex.isEmpty()) {
       LOG.warn("No columns to index for partition stats index");
       return engineContext.emptyHoodieData();
@@ -2072,14 +2100,15 @@ public class HoodieTableMetadataUtil {
     LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndex);
     // Create records for MDT
     int parallelism = Math.max(Math.min(partitionInfoList.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+    Option<Schema> writerSchema = lazyWriterSchemaOpt.get();
     return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionInfo -> {
       final String partitionPath = partitionInfo.getRelativePath();
       // Step 1: Collect Column Metadata for Each File
       List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionInfo.getFileNameToSizeMap().keySet().stream()
-              .map(fileName -> getFileStatsRangeMetadata(partitionPath, 
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
-              .collect(Collectors.toList());
+          .map(fileName -> getFileStatsRangeMetadata(partitionPath, 
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false, 
true, writerSchemaOpt))
+          .collect(Collectors.toList());
 
-      return collectAndProcessColumnMetadata(fileColumnMetadata, 
partitionPath).iterator();
+      return collectAndProcessColumnMetadata(fileColumnMetadata, 
partitionPath, true).iterator();
     });
   }
 
@@ -2087,7 +2116,9 @@ public class HoodieTableMetadataUtil {
                                                                                
        String filePath,
                                                                                
        HoodieTableMetaClient datasetMetaClient,
                                                                                
        List<String> columnsToIndex,
-                                                                               
        boolean isDeleted) {
+                                                                               
        boolean isDeleted,
+                                                                               
        boolean shouldReadColumnMetadataForLogFiles,
+                                                                               
        Option<Schema> writerSchemaOpt) {
     String filePartitionPath = filePath.startsWith("/") ? 
filePath.substring(1) : filePath;
     String fileName = FSUtils.getFileName(filePath, partitionPath);
     if (isDeleted) {
@@ -2095,7 +2126,7 @@ public class HoodieTableMetadataUtil {
           .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
           .collect(Collectors.toList());
     }
-    return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, 
columnsToIndex);
+    return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, 
columnsToIndex, shouldReadColumnMetadataForLogFiles, writerSchemaOpt);
   }
 
   public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,
@@ -2117,7 +2148,8 @@ public class HoodieTableMetadataUtil {
                       : Option.of(new Schema.Parser().parse(writerSchemaStr)));
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
       Option<Schema> tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
-      List<String> columnsToIndex = 
getColumnsToIndex(metadataConfig.isPartitionStatsIndexEnabled(), 
metadataConfig.getColumnsEnabledForColumnStatsIndex(), 
Lazy.eagerly(tableSchema));
+      Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+      List<String> columnsToIndex = 
getColumnsToIndex(metadataConfig.isPartitionStatsIndexEnabled(), 
metadataConfig.getColumnsEnabledForColumnStatsIndex(), writerSchemaOpt);
       if (columnsToIndex.isEmpty()) {
         return engineContext.emptyHoodieData();
       }
@@ -2130,29 +2162,78 @@ public class HoodieTableMetadataUtil {
           .collect(Collectors.toList());
 
       int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+      boolean shouldScanColStatsForTightBound = 
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+          && metadataConfig.isPartitionStatsIndexTightBoundEnabled() && 
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+      HoodieTableMetadata tableMetadata;
+      if (shouldScanColStatsForTightBound) {
+        tableMetadata = HoodieTableMetadata.create(engineContext, 
dataMetaClient.getStorage(), metadataConfig, 
dataMetaClient.getBasePath().toString());
+      } else {
+        tableMetadata = null;
+      }
       return engineContext.parallelize(partitionedWriteStats, 
parallelism).flatMap(partitionedWriteStat -> {
         final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
-        // Step 1: Collect Column Metadata for Each File
+        // Step 1: Collect Column Metadata for Each File part of current 
commit metadata
         List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionedWriteStat.stream()
-                .map(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, columnsToIndex))
-                .collect(Collectors.toList());
+            .map(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, columnsToIndex, tableSchema))
+            .collect(Collectors.toList());
+        if (shouldScanColStatsForTightBound) {
+          checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
+          // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
+          // Get all file names, including log files, in a set from the file 
slices
+          Set<String> fileNames = getPartitionFileSlices(dataMetaClient, 
Option.empty(), partitionName, true).stream()
+              .flatMap(fileSlice -> Stream.concat(
+                  
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+                  fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+              .filter(Objects::nonNull)
+              .collect(Collectors.toSet());
+          // Fetch metadata table COLUMN_STATS partition records for above 
files
+          List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+              
tableMetadata.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex, 
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
+                  // schema and properties are ignored in getInsertValue, so 
simply pass as null
+                  .map(record -> record.getData().getInsertValue(null, null))
+                  .filter(Option::isPresent)
+                  .map(data -> ((HoodieMetadataRecord) 
data.get()).getColumnStatsMetadata())
+                  .filter(stats -> fileNames.contains(stats.getFileName()))
+                  .map(HoodieColumnRangeMetadata::fromColumnStats)
+                  .collectAsList();
+          // incase of shouldScanColStatsForTightBound = true, we compute 
stats for the partition of interest for all files from getLatestFileSlice() 
excluding current commit here
+          // already fileColumnMetadata contains stats for files from the 
current infliht commit. so, we are adding both together and sending it to 
collectAndProcessColumnMetadata
+          fileColumnMetadata.add(partitionColumnMetadata);
+        }
 
-        return collectAndProcessColumnMetadata(fileColumnMetadata, 
partitionName).iterator();
+        return collectAndProcessColumnMetadata(fileColumnMetadata, 
partitionName, shouldScanColStatsForTightBound).iterator();
       });
     } catch (Exception e) {
       throw new HoodieException("Failed to generate column stats records for 
metadata table", e);
     }
   }
 
+  /**
+   * Generate key prefixes for each combination of column name in {@param 
columnsToIndex} and {@param partitionName}.
+   */
+  private static List<String> generateKeyPrefixes(List<String> columnsToIndex, 
String partitionName) {
+    List<String> keyPrefixes = new ArrayList<>();
+    PartitionIndexID partitionIndexId = new 
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionName));
+    for (String columnName : columnsToIndex) {
+      ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
+      String keyPrefix = columnIndexID.asBase64EncodedString()
+          .concat(partitionIndexId.asBase64EncodedString());
+      keyPrefixes.add(keyPrefix);
+    }
+
+    return keyPrefixes;
+  }
+
   private static List<HoodieColumnRangeMetadata<Comparable>> 
translateWriteStatToFileStats(HoodieWriteStat writeStat,
                                                                                
            HoodieTableMetaClient datasetMetaClient,
-                                                                               
            List<String> columnsToIndex) {
+                                                                               
            List<String> columnsToIndex,
+                                                                               
            Option<Schema> writerSchemaOpt) {
     if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) 
writeStat).getColumnStats().isPresent()) {
       Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
       return columnRangeMap.values().stream().collect(Collectors.toList());
     }
 
-    return getFileStatsRangeMetadata(writeStat.getPartitionPath(), 
writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
+    return getFileStatsRangeMetadata(writeStat.getPartitionPath(), 
writeStat.getPath(), datasetMetaClient, columnsToIndex, false, false, 
writerSchemaOpt);
   }
 
   public static String getPartitionStatsIndexKey(String partitionPath, String 
columnName) {
@@ -2165,7 +2246,10 @@ public class HoodieTableMetadataUtil {
   public static HoodieMetadataColumnStats 
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
                                                                   
HoodieMetadataColumnStats newColumnStats) {
     checkArgument(Objects.equals(prevColumnStats.getColumnName(), 
newColumnStats.getColumnName()));
-
+    // If new column stats is tight bound, then discard the previous column 
stats
+    if (newColumnStats.getIsTightBound()) {
+      return newColumnStats;
+    }
     // We're handling 2 cases in here
     //  - New record is a tombstone: in this case it simply overwrites 
previous state
     //  - Previous record is a tombstone: in that case new proper record would 
also
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
index b297b627ba3..2eaf5ef2e7e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
@@ -48,7 +48,8 @@ public class TestAvroSchemaConverter {
         + "`nullCount` BIGINT, "
         + "`totalSize` BIGINT, "
         + "`totalUncompressedSize` BIGINT, "
-        + "`isDeleted` BOOLEAN NOT NULL>";
+        + "`isDeleted` BOOLEAN NOT NULL, "
+        + "`isTightBound` BOOLEAN NOT NULL>";
     assertThat(dataType.getChildren().get(pos).toString(), is(expected));
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 12068b25708..044a31ad1fa 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.common.functional;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieArchivedLogFile;
@@ -43,15 +42,12 @@ import 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.LogReaderUtils;
 import org.apache.hudi.common.table.log.TestLogReaderUtils;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
-import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
-import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
 import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HadoopMapRedUtils;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -112,8 +108,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
-import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -136,7 +130,6 @@ import static org.mockito.Mockito.when;
 @SuppressWarnings("Duplicates")
 public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
-  private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = 
HoodieLogBlockType.AVRO_DATA_BLOCK;
   private static final int BUFFER_SIZE = 4096;
 
   private static HdfsTestService hdfsTestService;
@@ -692,7 +685,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
400);
 
-    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, 4);
+    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, 4, storage);
 
     FileCreateUtils.createDeltaCommit(basePath, "100", storage);
     // scan all log blocks (across multiple log files)
@@ -735,7 +728,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
300);
 
-    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, 3);
+    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, 3, storage);
 
     FileCreateUtils.createDeltaCommit(basePath, "100", storage);
 
@@ -823,7 +816,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
300);
 
-    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, 3);
+    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, 3, storage);
 
     FileCreateUtils.createDeltaCommit(basePath, "100", storage);
 
@@ -2780,27 +2773,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     }
   }
 
-  public static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, 
List<IndexedRecord> records,
-                                              Map<HeaderMetadataType, String> 
header) {
-    return getDataBlock(dataBlockType, 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
 header, new StoragePath("dummy_path"));
-  }
-
-  private static HoodieDataBlock getDataBlock(HoodieLogBlockType 
dataBlockType, List<HoodieRecord> records,
-                                              Map<HeaderMetadataType, String> 
header, StoragePath pathForReader) {
-    switch (dataBlockType) {
-      case CDC_DATA_BLOCK:
-        return new HoodieCDCDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
-      case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
-      case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(records, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
-      case PARQUET_DATA_BLOCK:
-        return new HoodieParquetDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
-      default:
-        throw new RuntimeException("Unknown data block type " + dataBlockType);
-    }
-  }
-
   private static Stream<Arguments> testArguments() {
     // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, 
Arg3: enableOptimizedLogBlocksScan
     return Stream.of(
@@ -2825,58 +2797,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     );
   }
 
-  private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
-                                                  Schema schema,
-                                                  List<IndexedRecord> records,
-                                                  int numFiles)
-      throws IOException, InterruptedException {
-    return writeLogFiles(partitionPath, schema, records, numFiles, false);
-  }
-
-  private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
-                                                  Schema schema,
-                                                  List<IndexedRecord> records,
-                                                  int numFiles,
-                                                  boolean 
enableBlockSequenceNumbers)
-      throws IOException, InterruptedException {
-    int blockSeqNo = 0;
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withSizeThreshold(1024).withFileId("test-fileid1").withDeltaCommit("100")
-            .withStorage(storage).build();
-    if (storage.exists(writer.getLogFile().getPath())) {
-      // enable append for reader test.
-      ((HoodieLogFormatWriter) writer).withOutputStream(
-          (FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
-    }
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-
-    Set<HoodieLogFile> logFiles = new HashSet<>();
-
-    // Create log files
-    int recordsPerFile = records.size() / numFiles;
-    int filesWritten = 0;
-
-    while (filesWritten < numFiles) {
-      int targetRecordsCount = filesWritten == numFiles - 1
-          ? recordsPerFile + (records.size() % recordsPerFile)
-          : recordsPerFile;
-      int offset = filesWritten * recordsPerFile;
-      List<IndexedRecord> targetRecords = records.subList(offset, offset + 
targetRecordsCount);
-
-      logFiles.add(writer.getLogFile());
-      writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, 
header));
-      filesWritten++;
-    }
-
-    writer.close();
-
-    return logFiles;
-  }
-
   /**
    * Utility to convert the given iterator to a List.
    */
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index be25eb4bfb2..c4727688546 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -18,26 +18,47 @@
 
 package org.apache.hudi.common.testutils;
 
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -46,6 +67,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+
 /**
  * The common hoodie test harness to provide the basic infrastructure.
  */
@@ -53,6 +77,7 @@ public class HoodieCommonTestHarness {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCommonTestHarness.class);
   protected static final String BASE_FILE_EXTENSION = 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
   protected static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = 
null;
+  protected static final HoodieLogBlock.HoodieLogBlockType 
DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
 
   protected String tableName;
   protected String basePath;
@@ -242,4 +267,105 @@ public class HoodieCommonTestHarness {
         .collect(Collectors.toList());
     return !pendingInstants.isEmpty();
   }
+
+  protected static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
+                                                    Schema schema,
+                                                    List<HoodieRecord> records,
+                                                    int numFiles,
+                                                    HoodieStorage storage,
+                                                    Properties props,
+                                                    String fileId,
+                                                    String commitTime)
+      throws IOException, InterruptedException {
+    List<IndexedRecord> indexedRecords = records.stream()
+        .map(record -> {
+          try {
+            return record.toIndexedRecord(schema, props);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        })
+        .filter(Option::isPresent)
+        .map(Option::get)
+        .map(HoodieRecord::getData)
+        .collect(Collectors.toList());
+    return writeLogFiles(partitionPath, schema, indexedRecords, numFiles, 
false, storage, fileId, commitTime);
+  }
+
+  protected static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
+                                                    Schema schema,
+                                                    List<IndexedRecord> 
records,
+                                                    int numFiles,
+                                                    HoodieStorage storage)
+      throws IOException, InterruptedException {
+    return writeLogFiles(partitionPath, schema, records, numFiles, false, 
storage, "test-fileid1", "100");
+  }
+
+  protected static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
+                                                    Schema schema,
+                                                    List<IndexedRecord> 
records,
+                                                    int numFiles,
+                                                    boolean 
enableBlockSequenceNumbers,
+                                                    HoodieStorage storage,
+                                                    String fileId,
+                                                    String commitTime)
+      throws IOException, InterruptedException {
+    int blockSeqNo = 0;
+    HoodieLogFormat.Writer writer =
+        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withSizeThreshold(1024).withFileId(fileId).withDeltaCommit(commitTime)
+            .withStorage(storage).build();
+    if (storage.exists(writer.getLogFile().getPath())) {
+      // enable append for reader test.
+      ((HoodieLogFormatWriter) writer).withOutputStream(
+          (FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
+    }
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+
+    Set<HoodieLogFile> logFiles = new HashSet<>();
+
+    // Create log files
+    int recordsPerFile = records.size() / numFiles;
+    int filesWritten = 0;
+
+    while (filesWritten < numFiles) {
+      int targetRecordsCount = filesWritten == numFiles - 1
+          ? recordsPerFile + (records.size() % recordsPerFile)
+          : recordsPerFile;
+      int offset = filesWritten * recordsPerFile;
+      List<IndexedRecord> targetRecords = records.subList(offset, offset + 
targetRecordsCount);
+
+      logFiles.add(writer.getLogFile());
+      writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, 
header));
+      filesWritten++;
+    }
+
+    writer.close();
+
+    return logFiles;
+  }
+
+  public static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType 
dataBlockType, List<IndexedRecord> records,
+                                             
Map<HoodieLogBlock.HeaderMetadataType, String> header) {
+    return getDataBlock(dataBlockType, 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
 header, new StoragePath("dummy_path"));
+  }
+
+  private static HoodieDataBlock 
getDataBlock(HoodieLogBlock.HoodieLogBlockType dataBlockType, 
List<HoodieRecord> records,
+                                              
Map<HoodieLogBlock.HeaderMetadataType, String> header, StoragePath 
pathForReader) {
+    switch (dataBlockType) {
+      case CDC_DATA_BLOCK:
+        return new HoodieCDCDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      case AVRO_DATA_BLOCK:
+        return new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      case HFILE_DATA_BLOCK:
+        return new HoodieHFileDataBlock(records, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+      case PARQUET_DATA_BLOCK:
+        return new HoodieParquetDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
+      default:
+        throw new RuntimeException("Unknown data block type " + dataBlockType);
+    }
+  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
index 5e86a6b6de4..7fcc0d16193 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
@@ -225,17 +225,17 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
     HoodieColumnRangeMetadata<Comparable> fileColumnRange1 = 
HoodieColumnRangeMetadata.<Comparable>create(
         "path/to/file", "columnName", 1, 5, 0, 10, 100, 200);
     HoodieRecord<HoodieMetadataPayload> firstPartitionStatsRecord =
-        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange1), false).findFirst().get();
+        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange1), false, false).findFirst().get();
     HoodieColumnRangeMetadata<Comparable> fileColumnRange2 = 
HoodieColumnRangeMetadata.<Comparable>create(
         "path/to/file", "columnName", 3, 8, 1, 15, 120, 250);
     HoodieRecord<HoodieMetadataPayload> updatedPartitionStatsRecord =
-        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange2), false).findFirst().get();
+        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange2), false, false).findFirst().get();
     HoodieMetadataPayload combinedPartitionStatsRecordPayload =
         
updatedPartitionStatsRecord.getData().preCombine(firstPartitionStatsRecord.getData());
     HoodieColumnRangeMetadata<Comparable> expectedColumnRange = 
HoodieColumnRangeMetadata.<Comparable>create(
         "path/to/file", "columnName", 1, 8, 1, 25, 220, 450);
     HoodieMetadataPayload expectedColumnRangeMetadata = 
(HoodieMetadataPayload) HoodieMetadataPayload.createPartitionStatsRecords(
-        PARTITION_NAME, Collections.singletonList(expectedColumnRange), 
false).findFirst().get().getData();
+        PARTITION_NAME, Collections.singletonList(expectedColumnRange), false, 
false).findFirst().get().getData();
     assertEquals(expectedColumnRangeMetadata, 
combinedPartitionStatsRecordPayload);
   }
 
@@ -244,19 +244,19 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
     HoodieColumnRangeMetadata<Comparable> fileColumnRange1 = 
HoodieColumnRangeMetadata.<Comparable>create(
         "path/to/file", "columnName", 1, 5, 0, 10, 100, 200);
     HoodieRecord<HoodieMetadataPayload> firstPartitionStatsRecord =
-        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange1), false).findFirst().get();
+        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange1), false, false).findFirst().get();
     HoodieColumnRangeMetadata<Comparable> fileColumnRange2 = 
HoodieColumnRangeMetadata.<Comparable>create(
         "path/to/file", "columnName", 3, 8, 1, 15, 120, 250);
     // create delete payload
     HoodieRecord<HoodieMetadataPayload> deletedPartitionStatsRecord =
-        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange2), true).findFirst().get();
+        HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME, 
Collections.singletonList(fileColumnRange2), true, false).findFirst().get();
     // deleted (or tombstone) record will be therefore deleting previous state 
of the record
     HoodieMetadataPayload combinedPartitionStatsRecordPayload =
         
deletedPartitionStatsRecord.getData().preCombine(firstPartitionStatsRecord.getData());
     HoodieColumnRangeMetadata<Comparable> expectedColumnRange = 
HoodieColumnRangeMetadata.<Comparable>create(
         "path/to/file", "columnName", 3, 8, 1, 15, 120, 250);
     HoodieMetadataPayload expectedColumnRangeMetadata = 
(HoodieMetadataPayload) HoodieMetadataPayload.createPartitionStatsRecords(
-        PARTITION_NAME, Collections.singletonList(expectedColumnRange), 
true).findFirst().get().getData();
+        PARTITION_NAME, Collections.singletonList(expectedColumnRange), true, 
false).findFirst().get().getData();
     assertEquals(expectedColumnRangeMetadata, 
combinedPartitionStatsRecordPayload);
 
     // another update for the same key should overwrite the delete record
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 83b6abe12e5..4ccc48b519d 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -25,12 +25,16 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -47,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -144,25 +149,10 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
             .withColumnStatsIndexForColumns("rider,driver")
             .withPartitionStatsIndexParallelism(1)
             .build(),
-        metaClient);
+        metaClient,
+        Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
     // Validate the result.
-    List<HoodieRecord> records = result.collectAsList();
-    // 3 partitions * 2 columns = 6 partition stats records
-    assertEquals(6, records.size());
-    assertEquals(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), 
records.get(0).getPartitionPath());
-    ((HoodieMetadataPayload) 
result.collectAsList().get(0).getData()).getColumnStatMetadata().get().getColumnName();
-    records.forEach(r -> {
-      HoodieMetadataPayload payload = (HoodieMetadataPayload) r.getData();
-      assertTrue(payload.getColumnStatMetadata().isPresent());
-      // instant1 < instant2 so instant1 should be in the min value and 
instant2 should be in the max value.
-      if 
(payload.getColumnStatMetadata().get().getColumnName().equals("rider")) {
-        assertEquals(String.format("{\"value\": \"rider-%s\"}", instant1), 
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
-        assertEquals(String.format("{\"value\": \"rider-%s\"}", instant2), 
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
-      } else if 
(payload.getColumnStatMetadata().get().getColumnName().equals("driver")) {
-        assertEquals(String.format("{\"value\": \"driver-%s\"}", instant1), 
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
-        assertEquals(String.format("{\"value\": \"driver-%s\"}", instant2), 
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
-      }
-    });
+    validatePartitionStats(result, instant1, instant2);
   }
 
   @Test
@@ -212,6 +202,90 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     }
   }
 
+  @Test
+  public void testGetLogFileColumnRangeMetadata() throws Exception {
+    HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getStorageConf());
+    String instant1 = "20230918120000000";
+    hoodieTestTable = hoodieTestTable.addCommit(instant1);
+    String instant2 = "20230918121110000";
+    hoodieTestTable = hoodieTestTable.addCommit(instant2);
+    List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new 
ArrayList<>();
+    List<String> columnsToIndex = Arrays.asList("rider", "driver");
+    // Generate 10 inserts for each partition and populate 
partitionBaseFilePairs and recordKeys.
+    DATE_PARTITIONS.forEach(p -> {
+      try {
+        URI partitionMetaFile = 
FileCreateUtils.createPartitionMetaFile(basePath, p);
+        StoragePath partitionMetadataPath = new StoragePath(partitionMetaFile);
+        String fileId1 = UUID.randomUUID().toString();
+        // add only one parquet file in first file slice
+        FileSlice fileSlice1 = new FileSlice(p, instant1, fileId1);
+        StoragePath storagePath1 = new 
StoragePath(hoodieTestTable.getBaseFilePath(p, fileId1).toUri());
+        writeParquetFile(instant1, storagePath1, 
dataGen.generateInsertsForPartition(instant1, 10, p), metaClient, 
engineContext);
+        HoodieBaseFile baseFile1 = new 
HoodieBaseFile(hoodieTestTable.getBaseFilePath(p, fileId1).toString());
+        fileSlice1.setBaseFile(baseFile1);
+        // add log file in second file slice with higher rider and driver 
values (which are concatenated with instant)
+        FileSlice fileSlice2 = new FileSlice(p, instant2, fileId1);
+        fileSlice2.setBaseFile(baseFile1);
+        StoragePath storagePath2 = new 
StoragePath(partitionMetadataPath.getParent(), 
hoodieTestTable.getLogFileNameById(fileId1, 1));
+        writeLogFiles(new StoragePath(metaClient.getBasePath(), p), 
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, 
dataGen.generateInsertsForPartition(instant2, 10, p), 1,
+            metaClient.getStorage(), new Properties(), fileId1, instant2);
+        fileSlice2.addLogFile(new 
HoodieLogFile(storagePath2.toUri().toString()));
+        partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
+            p,
+            
metaClient.getStorage().listDirectEntries(Arrays.asList(partitionMetadataPath, 
storagePath1, storagePath2)),
+            instant2,
+            Collections.emptySet()));
+        // NOTE: we need to set table config as we are not using write client 
explicitly and these configs are needed for log record reader
+        
metaClient.getTableConfig().setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(),
 "false");
+        
metaClient.getTableConfig().setValue(HoodieTableConfig.RECORDKEY_FIELDS.key(), 
"_row_key");
+        
metaClient.getTableConfig().setValue(HoodieTableConfig.PARTITION_FIELDS.key(), 
"partition_path");
+        List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataLogFile 
= HoodieTableMetadataUtil.getLogFileColumnRangeMetadata(
+            storagePath2.toString(),
+            metaClient,
+            columnsToIndex,
+            
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+        // there must be two ranges for rider and driver
+        assertEquals(2, columnRangeMetadataLogFile.size());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    // collect partition stats, this will collect stats for log files as well
+    HoodieData<HoodieRecord> result = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
+        engineContext,
+        partitionInfoList,
+        HoodieMetadataConfig.newBuilder().enable(true)
+            .withMetadataIndexColumnStats(true)
+            .withMetadataIndexPartitionStats(true)
+            .withColumnStatsIndexForColumns("rider,driver")
+            .withPartitionStatsIndexParallelism(1)
+            .build(),
+        metaClient,
+        Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+    // Validate the result.
+    validatePartitionStats(result, instant1, instant2);
+  }
+
+  private static void validatePartitionStats(HoodieData<HoodieRecord> result, 
String instant1, String instant2) {
+    List<HoodieRecord> records = result.collectAsList();
+    // 3 partitions * 2 columns = 6 partition stats records
+    assertEquals(6, records.size());
+    assertEquals(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), 
records.get(0).getPartitionPath());
+    ((HoodieMetadataPayload) 
result.collectAsList().get(0).getData()).getColumnStatMetadata().get().getColumnName();
+    records.forEach(r -> {
+      HoodieMetadataPayload payload = (HoodieMetadataPayload) r.getData();
+      assertTrue(payload.getColumnStatMetadata().isPresent());
+      // instant1 < instant2 so instant1 should be in the min value and 
instant2 should be in the max value.
+      if 
(payload.getColumnStatMetadata().get().getColumnName().equals("rider")) {
+        assertEquals(String.format("{\"value\": \"rider-%s\"}", instant1), 
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
+        assertEquals(String.format("{\"value\": \"rider-%s\"}", instant2), 
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
+      } else if 
(payload.getColumnStatMetadata().get().getColumnName().equals("driver")) {
+        assertEquals(String.format("{\"value\": \"driver-%s\"}", instant1), 
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
+        assertEquals(String.format("{\"value\": \"driver-%s\"}", instant2), 
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
+      }
+    });
+  }
+
   private static void writeParquetFile(String instant,
                                        StoragePath path,
                                        List<HoodieRecord> records,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 72cc75f6799..8609c87d128 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -236,36 +236,6 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
     }
   }
 
-  test(s"Test partition stats index on int type field with update and file 
pruning") {
-    Seq("cow", "mor").foreach { tableType =>
-      withTempDir { tmp =>
-        val tableName = generateTableName
-        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price int,
-             |  ts long
-             |) using hudi
-             |partitioned by (ts)
-             |tblproperties (
-             |  type = '$tableType',
-             |  primaryKey = 'id',
-             |  preCombineField = 'price',
-             |  hoodie.metadata.index.partition.stats.enable = 'true',
-             |  hoodie.metadata.index.column.stats.column.list = 'price'
-             |)
-             |location '$tablePath'
-             |""".stripMargin
-        )
-
-        writeAndValidatePartitionStats(tableName, tablePath)
-      }
-    }
-  }
-
   test(s"Test partition stats index without configuring columns to index") {
     Seq("cow", "mor").foreach { tableType =>
       withTempDir { tmp =>
@@ -308,6 +278,55 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
     }
   }
 
+  test(s"Test partition stats index on int type field with update and file 
pruning") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq(true, false).foreach { shouldCompact =>
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price int,
+               |  ts long
+               |) using hudi
+               |partitioned by (ts)
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'price',
+               |  hoodie.metadata.index.partition.stats.enable = 'true',
+               |  hoodie.metadata.index.column.stats.enable = 'true',
+               |  hoodie.metadata.index.column.stats.column.list = 'price'
+               |)
+               |location '$tablePath'
+               |""".stripMargin
+          )
+
+          // trigger compaction after update and validate stats
+          if (tableType == "mor" && shouldCompact) {
+            spark.sql("set hoodie.compact.inline=true")
+            spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+          }
+          spark.sql("set hoodie.metadata.enable=true")
+          spark.sql("set hoodie.enable.data.skipping=true")
+          spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+          writeAndValidatePartitionStats(tableName, tablePath)
+          if (tableType == "mor" && shouldCompact) {
+            // check partition stats records with tightBound
+            checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
+              Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, 
false),
+              Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, 
false),
+              Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001, 
false)
+            )
+          }
+        }
+      }
+    }
+  }
+
   private def writeAndValidatePartitionStats(tableName: String, tablePath: 
String): Unit = {
     spark.sql(
       s"""
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 9cd840df5a3..73ea18b6139 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1009,7 +1009,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           .getSortedColumnStatsList(partitionPath, latestFileNames);
 
       TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats = 
aggregateColumnStats(partitionPath, colStats);
-      List<HoodieRecord> partitionStatRecords = 
HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, new 
ArrayList<>(aggregatedColumnStats), false)
+      // TODO: fix `isTightBound` flag when stats based on log files are 
available
+      List<HoodieRecord> partitionStatRecords = 
HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, new 
ArrayList<>(aggregatedColumnStats), false, false)
           .collect(Collectors.toList());
       return partitionStatRecords.stream()
           .map(record -> {
@@ -1719,7 +1720,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         return allColumnNameList.stream()
             .flatMap(columnName ->
                 tableMetadata.getColumnStats(partitionFileNameList, 
columnName).values().stream()
-                    
.map(HoodieTableMetadataUtil::convertColumnStatsRecordToColumnRangeMetadata)
+                    .map(HoodieColumnRangeMetadata::fromColumnStats)
                     .collect(Collectors.toList())
                     .stream())
             .sorted(new HoodieColumnRangeMetadataComparator())

Reply via email to