This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9f4a0fc8bb [HUDI-8587] Supporting schema evolution with partition 
stats index (#12714)
c9f4a0fc8bb is described below

commit c9f4a0fc8bb20f823cf7029f46188416fd3d104f
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Jan 28 06:54:52 2025 -0800

    [HUDI-8587] Supporting schema evolution with partition stats index (#12714)
---
 .../hudi/client/HoodieColumnStatsIndexUtils.java   |   5 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   2 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   4 +-
 .../client/utils/SparkMetadataWriterUtils.java     |  15 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |   3 +-
 .../apache/hudi/common/util/FileFormatUtils.java   |  45 +++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 242 ++++++++++++++-------
 .../apache/hudi/common/util/TestBaseFileUtils.java |   8 +-
 .../apache/hudi/table/ITTestSchemaEvolution.java   |   2 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  82 ++++---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |   6 +-
 .../TestColStatsRecordWithMetadataRecord.java      | 152 ++++++++++++-
 .../hudi/functional/ColumnStatIndexTestBase.scala  |  11 +-
 .../hudi/functional/TestBasicSchemaEvolution.scala |   2 +-
 .../apache/hudi/functional/TestMORDataSource.scala |  12 +-
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |   2 -
 16 files changed, 442 insertions(+), 151 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
index adcbd3c2e4a..238e64bac1a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
@@ -32,6 +32,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
@@ -66,8 +67,8 @@ public class HoodieColumnStatsIndexUtils {
               HoodieCommitMetadata.class);
           if 
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {
             // update data table's table config for list of columns indexed.
-            List<String> columnsToIndex = 
HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, 
dataTable.getMetaClient(), config.getMetadataConfig(),
-                Option.of(config.getRecordMerger().getRecordType()));
+            List<String> columnsToIndex = new 
ArrayList(HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, 
dataTable.getMetaClient(), config.getMetadataConfig(),
+                Option.of(config.getRecordMerger().getRecordType())).keySet());
             // if col stats is getting updated, lets also update list of 
columns indexed if changed.
             updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
           }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3c3295be486..e1103a8bd4f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -438,7 +438,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       Set<String> columnsToIndexSet = new HashSet<>(HoodieTableMetadataUtil
           .getColumnsToIndex(hoodieTable.getMetaClient().getTableConfig(),
               config.getMetadataConfig(), 
Lazy.eagerly(Option.of(writeSchemaWithMetaFields)),
-              Option.of(this.recordMerger.getRecordType())));
+              Option.of(this.recordMerger.getRecordType())).keySet());
       final List<Pair<String, Schema.Field>> fieldsToIndex = 
columnsToIndexSet.stream()
           .map(fieldName -> 
HoodieAvroUtils.getSchemaForField(writeSchemaWithMetaFields, 
fieldName)).collect(Collectors.toList());
       try {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cd48b293d0d..9dafe3bad09 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -542,9 +542,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
     // Find the columns to index
     Lazy<Option<Schema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
-    final List<String> columnsToIndex = 
HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
+    final List<String> columnsToIndex = new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
         dataWriteConfig.getMetadataConfig(), tableSchema, true,
-        Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
+        
Option.of(dataWriteConfig.getRecordMerger().getRecordType())).keySet());
 
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
     if (columnsToIndex.isEmpty()) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 8b9d51c252c..15069004f56 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.client.utils;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -368,7 +369,8 @@ public class SparkMetadataWriterUtils {
    */
   public static HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>> 
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata, 
String indexPartition,
                                                                                
                                            HoodieEngineContext engineContext, 
HoodieTableMetadata tableMetadata,
-                                                                               
                                            HoodieTableMetaClient 
dataMetaClient, HoodieMetadataConfig metadataConfig) {
+                                                                               
                                            HoodieTableMetaClient 
dataMetaClient, HoodieMetadataConfig metadataConfig,
+                                                                               
                                            
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
     // In this function we iterate over all the partitions modified by the 
commit and fetch the latest files in those partitions
     // We fetch stored Expression index records for these latest files and 
return HoodiePairData of partition name and list of column range metadata of 
these files
 
@@ -385,9 +387,14 @@ public class SparkMetadataWriterUtils {
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
       Schema tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema)
           .orElseThrow(() -> new IllegalStateException(String.format("Expected 
writer schema in commit metadata %s", commitMetadata)));
-      // filter columns with only supported types
-      final List<String> validColumnsToIndex = columnsToIndex.stream()
-          .filter(col -> 
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) || 
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col, tableSchema))
+      List<Pair<String,Schema>> columnsToIndexSchemaMap = 
columnsToIndex.stream()
+          .map(columnToIndex -> Pair.of(columnToIndex, 
HoodieAvroUtils.getSchemaForField(tableSchema, 
columnToIndex).getValue().schema())).collect(
+          Collectors.toList());
+      // filter for supported types
+      final List<String> validColumnsToIndex = columnsToIndexSchemaMap.stream()
+          .filter(colSchemaPair -> 
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(colSchemaPair.getKey())
+              || 
HoodieTableMetadataUtil.isColumnTypeSupported(colSchemaPair.getValue(), 
recordTypeOpt))
+          .map(entry -> entry.getKey())
           .collect(Collectors.toList());
       if (validColumnsToIndex.isEmpty()) {
         return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new 
ArrayList<>()));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index a0b1d5a0b0a..863a4995f6e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -177,7 +177,8 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     if (isExprIndexUsingColumnStats) {
       // Fetch column range metadata for affected partitions in the commit
       HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> 
exprIndexPartitionStatUpdates =
-          
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata, 
indexPartition, engineContext, getTableMetadata(), dataMetaClient, 
dataWriteConfig.getMetadataConfig())
+          
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata, 
indexPartition, engineContext, getTableMetadata(), dataMetaClient, 
dataWriteConfig.getMetadataConfig(),
+                  Option.of(dataWriteConfig.getRecordMerger().getRecordType()))
               .flatMapValues(List::iterator);
       // The function below merges the column range metadata from the updated 
data with latest column range metadata of affected partition computed above
       partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index 72fe9f0f4d0..3e295f34d3f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -30,6 +30,7 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
@@ -39,6 +40,7 @@ import org.apache.avro.generic.GenericRecord;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -59,14 +61,53 @@ public abstract class FileFormatUtils {
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
   public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
-                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+                                                                               
                  Map<String, Schema> colsToIndexSchemaMap) {
+
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
+    // Let's do one pass and deduce all columns that needs to go through 
schema evolution.
+    Map<String, Set<Class<?>>> schemaSeenForColsToIndex = new HashMap<>();
+    Set<String> colsWithSchemaEvolved = new HashSet<>();
+    fileColumnRanges.stream().forEach(entry -> {
+      String colToIndex = entry.getColumnName();
+      Class<?> minValueClass = entry.getMinValue() != null ? 
entry.getMinValue().getClass() : null;
+      Class<?> maxValueClass = entry.getMaxValue() != null ? 
entry.getMaxValue().getClass() : null;
+      schemaSeenForColsToIndex.computeIfAbsent(colToIndex, s -> new HashSet());
+      if (minValueClass != null) {
+        schemaSeenForColsToIndex.get(colToIndex).add(minValueClass);
+      }
+      if (maxValueClass != null) {
+        schemaSeenForColsToIndex.get(colToIndex).add(maxValueClass);
+      }
+      if (!colsToIndexSchemaMap.isEmpty() && 
schemaSeenForColsToIndex.get(colToIndex).size() > 1) {
+        colsWithSchemaEvolved.add(colToIndex);
+      }
+    });
+
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
         .map(e -> HoodieColumnRangeMetadata.create(
             relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
             e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
-        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
+        .reduce((a,b) -> {
+          if (colsWithSchemaEvolved.isEmpty() || colsToIndexSchemaMap.isEmpty()
+              || a.getMinValue() == null || a.getMaxValue() == null || 
b.getMinValue() == null || b.getMaxValue() == null
+              || !colsWithSchemaEvolved.contains(a.getColumnName())) {
+            return HoodieColumnRangeMetadata.merge(a, b);
+          } else {
+            // schema is evolving for the column of interest.
+            Schema schema = colsToIndexSchemaMap.get(a.getColumnName());
+            HoodieColumnRangeMetadata<T> left = 
HoodieColumnRangeMetadata.create(a.getFilePath(), a.getColumnName(),
+                (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
a.getMinValue()),
+                (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
a.getMaxValue()), a.getNullCount(),
+                a.getValueCount(), a.getTotalSize(), 
a.getTotalUncompressedSize());
+            HoodieColumnRangeMetadata<T> right = 
HoodieColumnRangeMetadata.create(b.getFilePath(), b.getColumnName(),
+                (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
b.getMinValue()),
+                (T) HoodieTableMetadataUtil.coerceToComparable(schema, 
b.getMaxValue()), b.getNullCount(),
+                b.getValueCount(), b.getTotalSize(), 
b.getTotalUncompressedSize());
+            return HoodieColumnRangeMetadata.merge(left, right);
+          }
+        }).orElseThrow(() -> new HoodieException("MergingColumnRanges 
failed."));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 194c6250950..91c525eb488 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -129,11 +129,13 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -770,8 +772,8 @@ public class HoodieTableMetadataUtil {
       deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, 
entry)));
     });
 
-    List<String> columnsToIndex = 
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
-        Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)), false, 
recordTypeOpt);
+    List<String> columnsToIndex = new 
ArrayList<>(getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)), false, 
recordTypeOpt).keySet());
 
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
@@ -1474,12 +1476,12 @@ public class HoodieTableMetadataUtil {
     }
 
     try {
-      List<String> columnsToIndex = getColumnsToIndex(commitMetadata, 
dataMetaClient, metadataConfig, recordTypeOpt);
-      if (columnsToIndex.isEmpty()) {
+      Map<String, Schema> columnsToIndexSchemaMap = 
getColumnsToIndex(commitMetadata, dataMetaClient, metadataConfig, 
recordTypeOpt);
+      if (columnsToIndexSchemaMap.isEmpty()) {
         // In case there are no columns to index, bail
         return engineContext.emptyHoodieData();
       }
-
+      List<String> columnsToIndex = new 
ArrayList<>(columnsToIndexSchemaMap.keySet());
       int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getColumnStatsIndexParallelism()), 1);
       return engineContext.parallelize(allWriteStats, parallelism)
           .flatMap(writeStat ->
@@ -1489,7 +1491,7 @@ public class HoodieTableMetadataUtil {
     }
   }
 
-  public static List<String> getColumnsToIndex(HoodieCommitMetadata 
commitMetadata, HoodieTableMetaClient dataMetaClient,
+  public static Map<String, Schema> getColumnsToIndex(HoodieCommitMetadata 
commitMetadata, HoodieTableMetaClient dataMetaClient,
                                                HoodieMetadataConfig 
metadataConfig, Option<HoodieRecordType> recordTypeOpt) {
     Option<Schema> writerSchema =
         
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
@@ -1512,9 +1514,15 @@ public class HoodieTableMetadataUtil {
   static final String[] META_COLS_TO_ALWAYS_INDEX = 
{COMMIT_TIME_METADATA_FIELD, RECORD_KEY_METADATA_FIELD, 
PARTITION_PATH_METADATA_FIELD};
   @VisibleForTesting
   public static final Set<String> META_COL_SET_TO_INDEX = new 
HashSet<>(Arrays.asList(META_COLS_TO_ALWAYS_INDEX));
+  @VisibleForTesting
+  static final Map<String, Schema> META_COLS_TO_ALWAYS_INDEX_SCHEMA_MAP = new 
TreeMap() {{
+      put(COMMIT_TIME_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+      put(RECORD_KEY_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+      put(PARTITION_PATH_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+    }};
 
   @VisibleForTesting
-  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+  public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig 
tableConfig,
                                                HoodieMetadataConfig 
metadataConfig,
                                                Lazy<Option<Schema>> 
tableSchemaLazyOpt,
                                                Option<HoodieRecordType> 
recordType) {
@@ -1522,7 +1530,7 @@ public class HoodieTableMetadataUtil {
   }
 
   @VisibleForTesting
-  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+  public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig 
tableConfig,
                                                HoodieMetadataConfig 
metadataConfig,
                                                Lazy<Option<Schema>> 
tableSchemaLazyOpt,
                                                boolean isTableInitializing) {
@@ -1530,17 +1538,20 @@ public class HoodieTableMetadataUtil {
   }
 
   @VisibleForTesting
-  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+  public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig 
tableConfig,
                                                HoodieMetadataConfig 
metadataConfig,
                                                Lazy<Option<Schema>> 
tableSchemaLazyOpt,
                                                boolean isTableInitializing,
                                                Option<HoodieRecordType> 
recordType) {
-    Stream<String> columnsToIndexWithoutRequiredMetas = 
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchemaLazyOpt, 
isTableInitializing, recordType);
+    Map<String, Schema> columnsToIndexWithoutRequiredMetas = 
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchemaLazyOpt, 
isTableInitializing, recordType);
     if (!tableConfig.populateMetaFields()) {
-      return columnsToIndexWithoutRequiredMetas.collect(Collectors.toList());
+      return columnsToIndexWithoutRequiredMetas;
     }
 
-    return Stream.concat(Arrays.stream(META_COLS_TO_ALWAYS_INDEX), 
columnsToIndexWithoutRequiredMetas).collect(Collectors.toList());
+    Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();
+    colsToIndexSchemaMap.putAll(META_COLS_TO_ALWAYS_INDEX_SCHEMA_MAP);
+    colsToIndexSchemaMap.putAll(columnsToIndexWithoutRequiredMetas);
+    return colsToIndexSchemaMap;
   }
 
   /**
@@ -1555,7 +1566,7 @@ public class HoodieTableMetadataUtil {
    * @param recordType           Option of record type. Used to determine 
which types are valid to index
    * @return list of columns that should be indexed
    */
-  private static Stream<String> 
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
+  private static Map<String, Schema> 
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
                                                                            
Lazy<Option<Schema>> tableSchemaLazyOpt,
                                                                            
boolean isTableInitializing,
                                                                            
Option<HoodieRecordType> recordType) {
@@ -1563,41 +1574,41 @@ public class HoodieTableMetadataUtil {
     if (!columnsToIndex.isEmpty()) {
       // if explicitly overriden
       if (isTableInitializing) {
-        return columnsToIndex.stream();
+        Map<String, Schema> toReturn = new LinkedHashMap<>();
+        columnsToIndex.stream().forEach(colName -> toReturn.put(colName, 
null));
+        return toReturn;
       }
       ValidationUtils.checkArgument(tableSchemaLazyOpt.get().isPresent(), 
"Table schema not found for the table while computing col stats");
       // filter for eligible fields
       Option<Schema> tableSchema = tableSchemaLazyOpt.get();
-      return columnsToIndex.stream().filter(fieldName -> 
!META_COL_SET_TO_INDEX.contains(fieldName))
-          .filter(fieldName -> {
-            if (tableSchema.isPresent()) {
-              return 
isColumnTypeSupported(HoodieAvroUtils.getSchemaForField(tableSchema.get(), 
fieldName).getValue().schema(), recordType);
-            } else {
-              if (isTableInitializing) {
-                return true;
-              } else {
-                throw new HoodieIOException("Table schema not found to find 
eligible cols to index");
-              }
-            }
-          });
+      Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();
+      columnsToIndex.stream().filter(fieldName -> 
!META_COL_SET_TO_INDEX.contains(fieldName))
+          .map(colName -> Pair.of(colName, 
HoodieAvroUtils.getSchemaForField(tableSchema.get(), 
colName).getRight().schema()))
+          .filter(fieldNameSchemaPair -> {
+            return isColumnTypeSupported(fieldNameSchemaPair.getValue(), 
recordType);
+          }).forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), 
entry.getValue()));
+      return colsToIndexSchemaMap;
     }
     // if not overridden
     if (tableSchemaLazyOpt.get().isPresent()) {
-      return tableSchemaLazyOpt.get().map(schema -> 
getFirstNSupportedFieldNames(schema, 
metadataConfig.maxColumnsToIndexForColStats(), 
recordType)).orElse(Stream.empty());
+      Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();
+      tableSchemaLazyOpt.get().map(schema -> getFirstNSupportedFields(schema, 
metadataConfig.maxColumnsToIndexForColStats(), 
recordType)).orElse(Stream.empty())
+          .forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), 
entry.getValue()));
+      return colsToIndexSchemaMap;
     } else if (isTableInitializing) {
-      return Stream.empty();
+      return Collections.emptyMap();
     } else {
       throw new HoodieMetadataException("Cannot initialize col stats with 
empty list of cols");
     }
   }
 
-  private static Stream<String> getFirstNSupportedFieldNames(Schema 
tableSchema, int n, Option<HoodieRecordType> recordType) {
-    return getFirstNFieldNames(tableSchema.getFields().stream()
-        .filter(field -> isColumnTypeSupported(field.schema(), 
recordType)).map(Schema.Field::name), n);
+  private static Stream<Pair<String, Schema>> getFirstNSupportedFields(Schema 
tableSchema, int n, Option<HoodieRecordType> recordType) {
+    return getFirstNFields(tableSchema.getFields().stream()
+        .filter(field -> isColumnTypeSupported(field.schema(), 
recordType)).map(field -> Pair.of(field.name(), field.schema())), n);
   }
 
-  private static Stream<String> getFirstNFieldNames(Stream<String> fieldNames, 
int n) {
-    return fieldNames.filter(fieldName -> 
!HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName)).limit(n);
+  private static Stream<Pair<String, Schema>> 
getFirstNFields(Stream<Pair<String, Schema>> fieldSchemaPairStream, int n) {
+    return fieldSchemaPairStream.filter(fieldSchemaPair -> 
!HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldSchemaPair.getKey())).limit(n);
   }
 
   private static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
@@ -1755,7 +1766,7 @@ public class HoodieTableMetadataUtil {
    * NOTE: This method has to stay compatible with the semantic of
    *      {@link FileFormatUtils#readColumnStatsFromMetadata} as they are used 
in tandem
    */
-  private static Comparable<?> coerceToComparable(Schema schema, Object val) {
+  public static Comparable<?> coerceToComparable(Schema schema, Object val) {
     if (val == null) {
       return null;
     }
@@ -1780,7 +1791,7 @@ public class HoodieTableMetadataUtil {
           //       depending on the Avro version. Hence, we simply cast it to 
{@code Comparable<?>}
           return (Comparable<?>) val;
         }
-        return (Integer) val;
+        return castToInteger(val);
 
       case LONG:
         if (schema.getLogicalType() == LogicalTypes.timeMicros()
@@ -1790,17 +1801,18 @@ public class HoodieTableMetadataUtil {
           //       depending on the Avro version. Hence, we simply cast it to 
{@code Comparable<?>}
           return (Comparable<?>) val;
         }
-        return (Long) val;
+        return castToLong(val);
 
       case STRING:
         // unpack the avro Utf8 if possible
         return val.toString();
       case FLOAT:
+        return castToFloat(val);
       case DOUBLE:
+        return castToDouble((val));
       case BOOLEAN:
         return (Comparable<?>) val;
 
-
       // TODO add support for those types
       case ENUM:
       case MAP:
@@ -1814,16 +1826,97 @@ public class HoodieTableMetadataUtil {
     }
   }
 
-  private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+  private static Integer castToInteger(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return (Integer) val;
+    } else if (val instanceof Long) {
+      return ((Long) val).intValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).intValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).intValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1 : 0;
+    }  else {
+      // best effort casting
+      return Integer.parseInt(val.toString());
+    }
+  }
+
+  private static Long castToLong(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).longValue();
+    } else if (val instanceof Long) {
+      return ((Long) val);
+    } else if (val instanceof Float) {
+      return ((Float)val).longValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).longValue();
+    } else if (val instanceof Boolean) {
+      return ((Boolean) val) ? 1L : 0L;
+    }  else {
+      // best effort casting
+      return Long.parseLong(val.toString());
+    }
+  }
+
+  private static Float castToFloat(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).floatValue();
+    } else if (val instanceof Long) {
+      return ((Long) val).floatValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).floatValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).floatValue();
+    } else if (val instanceof Boolean) {
+      return (Boolean) val ? 1.0f : 0.0f;
+    }  else {
+      // best effort casting
+      return Float.parseFloat(val.toString());
+    }
+  }
+
+  private static Double castToDouble(Object val) {
+    if (val == null) {
+      return null;
+    }
+    if (val instanceof Integer) {
+      return ((Integer) val).doubleValue();
+    } else if (val instanceof Long) {
+      return ((Long) val).doubleValue();
+    } else if (val instanceof Float) {
+      return ((Float)val).doubleValue();
+    } else if (val instanceof Double) {
+      return ((Double)val).doubleValue();
+    } else if (val instanceof Boolean) {
+      return (Boolean) val ? 1.0d : 0.0d;
+    }  else {
+      // best effort casting
+      return Double.parseDouble(val.toString());
+    }
+  }
+
+  public static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+    Schema schemaToCheck = resolveNullableSchema(schema);
     // if record type is set and if its AVRO, MAP, ARRAY, RECORD and ENUM 
types are unsupported.
     if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
-      return (schema.getType() != Schema.Type.RECORD && schema.getType() != 
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
-          && schema.getType() != Schema.Type.ENUM);
+      return (schemaToCheck.getType() != Schema.Type.RECORD && 
schemaToCheck.getType() != Schema.Type.ARRAY && schemaToCheck.getType() != 
Schema.Type.MAP
+          && schemaToCheck.getType() != Schema.Type.ENUM);
     }
     // if record Type is not set or if recordType is SPARK then we cannot 
support AVRO, MAP, ARRAY, RECORD, ENUM and FIXED and BYTES type as well.
     // HUDI-8585 will add support for BYTES and FIXED
-    return schema.getType() != Schema.Type.RECORD && schema.getType() != 
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
-        && schema.getType() != Schema.Type.ENUM && schema.getType() != 
Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED;
+    return schemaToCheck.getType() != Schema.Type.RECORD && 
schemaToCheck.getType() != Schema.Type.ARRAY && schemaToCheck.getType() != 
Schema.Type.MAP
+        && schemaToCheck.getType() != Schema.Type.ENUM && 
schemaToCheck.getType() != Schema.Type.BYTES && schemaToCheck.getType() != 
Schema.Type.FIXED;
   }
 
   public static Set<String> getInflightMetadataPartitions(HoodieTableConfig 
tableConfig) {
@@ -2379,28 +2472,34 @@ public class HoodieTableMetadataUtil {
 
   private static Stream<HoodieRecord> collectAndProcessColumnMetadata(
           List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata,
-          String partitionPath, boolean isTightBound) {
-    return collectAndProcessColumnMetadata(partitionPath, isTightBound, 
Option.empty(), fileColumnMetadata.stream().flatMap(List::stream));
+          String partitionPath, boolean isTightBound,
+          Map<String, Schema> colsToIndexSchemaMap
+  ) {
+    return collectAndProcessColumnMetadata(partitionPath, isTightBound, 
Option.empty(), fileColumnMetadata.stream().flatMap(List::stream), 
colsToIndexSchemaMap);
   }
 
   private static Stream<HoodieRecord> 
collectAndProcessColumnMetadata(Iterable<HoodieColumnRangeMetadata<Comparable>> 
fileColumnMetadataIterable, String partitionPath,
-                                                                      boolean 
isTightBound, Option<String> indexPartitionOpt) {
+                                                                      boolean 
isTightBound, Option<String> indexPartitionOpt,
+                                                                      
Map<String, Schema> colsToIndexSchemaMap
+  ) {
 
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata = new 
ArrayList<>();
     fileColumnMetadataIterable.forEach(fileColumnMetadata::add);
     // Group by Column Name
-    return collectAndProcessColumnMetadata(partitionPath, isTightBound, 
indexPartitionOpt, fileColumnMetadata.stream());
+    return collectAndProcessColumnMetadata(partitionPath, isTightBound, 
indexPartitionOpt, fileColumnMetadata.stream(), colsToIndexSchemaMap);
   }
 
   private static Stream<HoodieRecord> collectAndProcessColumnMetadata(String 
partitionPath, boolean isTightBound, Option<String> indexPartitionOpt,
-                                                                      
Stream<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata) {
+                                                                      
Stream<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata,
+                                                                      
Map<String, Schema> colsToIndexSchemaMap
+  ) {
     // Group by Column Name
     Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap 
=
         
fileColumnMetadata.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
 Collectors.toList()));
 
     // Aggregate Column Ranges
     Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata 
= columnMetadataMap.entrySet().stream()
-        .map(entry -> FileFormatUtils.getColumnRangeInPartition(partitionPath, 
entry.getValue()));
+        .map(entry -> FileFormatUtils.getColumnRangeInPartition(partitionPath, 
entry.getValue(), colsToIndexSchemaMap));
 
     // Create Partition Stats Records
     return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, 
partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound, 
indexPartitionOpt);
@@ -2436,17 +2535,13 @@ public class HoodieTableMetadataUtil {
       return engineContext.emptyHoodieData();
     }
     Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ? 
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() -> 
tryResolveSchemaForTable(dataTableMetaClient));
-    final List<String> columnsToIndex = 
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig, 
lazyWriterSchemaOpt,
+    final Map<String, Schema> columnsToIndexSchemaMap = 
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig, 
lazyWriterSchemaOpt,
         
dataTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().empty(),
 recordTypeOpt);
-    if (columnsToIndex.isEmpty()) {
+    if (columnsToIndexSchemaMap.isEmpty()) {
       LOG.warn("No columns to index for partition stats index");
       return engineContext.emptyHoodieData();
     }
-    // filter columns with only supported types
-    final List<String> validColumnsToIndex = columnsToIndex.stream()
-        .filter(col -> SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) || 
validateDataTypeForPartitionStats(col, lazyWriterSchemaOpt.get().get()))
-        .collect(Collectors.toList());
-    LOG.debug("Indexing following columns for partition stats index: {}", 
validColumnsToIndex);
+    LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndexSchemaMap);
 
     // Group by partition path and collect file names (BaseFile and LogFiles)
     List<Pair<String, Set<String>>> partitionToFileNames = 
partitionInfoList.stream()
@@ -2463,11 +2558,11 @@ public class HoodieTableMetadataUtil {
       final String partitionPath = partitionInfo.getKey();
       // Step 1: Collect Column Metadata for Each File
       List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionInfo.getValue().stream()
-          .map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName, 
dataTableMetaClient, validColumnsToIndex, false,
+          .map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName, 
dataTableMetaClient, new ArrayList<>(columnsToIndexSchemaMap.keySet()), false,
               metadataConfig.getMaxReaderBufferSize()))
           .collect(Collectors.toList());
 
-      return collectAndProcessColumnMetadata(fileColumnMetadata, 
partitionPath, true).iterator();
+      return collectAndProcessColumnMetadata(fileColumnMetadata, 
partitionPath, true, columnsToIndexSchemaMap).iterator();
     });
   }
 
@@ -2494,14 +2589,16 @@ public class HoodieTableMetadataUtil {
   }
 
   private static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(HoodiePairData<String, 
List<HoodieColumnRangeMetadata<Comparable>>> columnRangeMetadataPartitionPair,
-                                                                               
  HoodieTableMetaClient dataMetaClient) {
+                                                                               
  HoodieTableMetaClient dataMetaClient,
+                                                                               
  Map<String, Schema> colsToIndexSchemaMap
+  ) {
     try {
       return columnRangeMetadataPartitionPair
           .flatMapValues(List::iterator)
           .groupByKey()
           .map(pair -> {
             final String partitionName = pair.getLeft();
-            return collectAndProcessColumnMetadata(pair.getRight(), 
partitionName, isShouldScanColStatsForTightBound(dataMetaClient), 
Option.empty());
+            return collectAndProcessColumnMetadata(pair.getRight(), 
partitionName, isShouldScanColStatsForTightBound(dataMetaClient), 
Option.empty(), colsToIndexSchemaMap);
           })
           .flatMap(recordStream -> recordStream.iterator());
     } catch (Exception e) {
@@ -2531,11 +2628,12 @@ public class HoodieTableMetadataUtil {
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
       Option<Schema> tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
       Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
-      List<String> columnsToIndex = 
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, 
writerSchemaOpt, false, recordTypeOpt);
-      if (columnsToIndex.isEmpty()) {
+      Map<String, Schema> columnsToIndexSchemaMap = 
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, 
writerSchemaOpt, false, recordTypeOpt);
+      if (columnsToIndexSchemaMap.isEmpty()) {
         return engineContext.emptyHoodieData();
       }
-      LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndex);
+      List<String> colsToIndex = new 
ArrayList<>(columnsToIndexSchemaMap.keySet());
+      LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndexSchemaMap.keySet());
       // Group by partitionPath and then gather write stats lists,
       // where each inner list contains HoodieWriteStat objects that have the 
same partitionPath.
       List<List<HoodieWriteStat>> partitionedWriteStats = new 
ArrayList<>(allWriteStats.stream()
@@ -2549,7 +2647,7 @@ public class HoodieTableMetadataUtil {
         final String partitionName = 
partitionedWriteStat.get(0).getPartitionPath();
         // Step 1: Collect Column Metadata for Each File part of current 
commit metadata
         List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata = 
partitionedWriteStat.stream()
-            .flatMap(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, columnsToIndex, tableSchema).stream()).collect(toList());
+            .flatMap(writeStat -> translateWriteStatToFileStats(writeStat, 
dataMetaClient, colsToIndex, tableSchema).stream()).collect(toList());
 
         if (shouldScanColStatsForTightBound) {
           checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
@@ -2563,7 +2661,7 @@ public class HoodieTableMetadataUtil {
               .collect(Collectors.toSet());
           // Fetch metadata table COLUMN_STATS partition records for above 
files
           List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata 
= tableMetadata
-              .getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex, 
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
+              .getRecordsByKeyPrefixes(generateKeyPrefixes(colsToIndex, 
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
               // schema and properties are ignored in getInsertValue, so 
simply pass as null
               .map(record -> 
((HoodieMetadataPayload)record.getData()).getColumnStatMetadata())
               .filter(Option::isPresent)
@@ -2580,7 +2678,7 @@ public class HoodieTableMetadataUtil {
         return Pair.of(partitionName, fileColumnMetadata);
       });
 
-      return convertMetadataToPartitionStatsRecords(columnRangeMetadata, 
dataMetaClient);
+      return convertMetadataToPartitionStatsRecords(columnRangeMetadata, 
dataMetaClient, columnsToIndexSchemaMap);
     } catch (Exception e) {
       throw new HoodieException("Failed to generate column stats records for 
metadata table", e);
     }
@@ -2599,20 +2697,6 @@ public class HoodieTableMetadataUtil {
     }
   }
 
-  /**
-   * Given table schema and field to index, checks if field's data type are 
supported.
-   *
-   * @param columnToIndex column to index
-   * @param tableSchema   table schema
-   * @return true if field's data type is supported, false otherwise
-   */
-  @VisibleForTesting
-  public static boolean validateDataTypeForPartitionStats(String 
columnToIndex, Schema tableSchema) {
-    Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, 
columnToIndex);
-    // to be fixed HUDI-8680.
-    return isColumnTypeSupported(fieldSchema, 
Option.of(HoodieRecordType.SPARK));
-  }
-
   /**
    * Generate key prefixes for each combination of column name in {@param 
columnsToIndex} and {@param partitionName}.
    */
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
index e73aaebc4ed..57cd85f3570 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -42,7 +43,7 @@ public class TestBaseFileUtils {
         "path/to/file2", COLUMN_NAME, 3, 8, 1, 15, 120, 250);
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
+    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges, 
Collections.emptyMap());
     // Step 3: Assertions
     assertEquals(PARTITION_PATH, result.getFilePath());
     assertEquals(COLUMN_NAME, result.getColumnName());
@@ -64,7 +65,7 @@ public class TestBaseFileUtils {
 
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
+    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges, 
Collections.emptyMap());
     // Step 3: Assertions
     assertEquals(PARTITION_PATH, result.getFilePath());
     assertEquals(COLUMN_NAME, result.getColumnName());
@@ -85,6 +86,7 @@ public class TestBaseFileUtils {
         "path/to/file2", "columnName2", null, 8, 1, 15, 120, 250);
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    assertThrows(IllegalArgumentException.class, () -> 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges));
+    assertThrows(IllegalArgumentException.class, () -> 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges,
+        Collections.emptyMap()));
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index f7eba5e4d85..5026af91a96 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -365,7 +365,7 @@ public class ITTestSchemaEvolution {
         FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false,
         HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED.key(), false,
         HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true,
-        HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
"false", // HUDI-8587
+        HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true",
         HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), 
"false");
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 377b0121e71..4be1b26183d 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -69,7 +69,6 @@ import static 
org.apache.hudi.avro.TestHoodieAvroUtils.SCHEMA_WITH_AVRO_TYPES_ST
 import static 
org.apache.hudi.avro.TestHoodieAvroUtils.SCHEMA_WITH_NESTED_FIELD_STR;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.computeRevivedAndDeletedKeys;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileIDForFileGroup;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForPartitionStats;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryOrExpressionIndex;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -389,8 +388,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     addNColumns(colNames, 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
     List<String> expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
     addNColumns(expected, 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue());
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
-        Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with table schema < default
     int tableSchemaSize = 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() - 10;
@@ -399,7 +398,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     addNColumns(colNames, tableSchemaSize);
     expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
     addNColumns(expected, tableSchemaSize);
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with max val < tableSchema
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -410,7 +410,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     addNColumns(colNames, 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
     expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
     addNColumns(expected, 3);
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with max val > tableSchema
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -421,7 +422,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     addNColumns(colNames, tableSchemaSize);
     expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
     addNColumns(expected, tableSchemaSize);
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with list of cols and a nested field as well.
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -448,7 +450,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         .noDefault()
         .endRecord();
 
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema)), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(schema)), false).keySet()));
 
     //test with list of cols longer than config
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -462,7 +465,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected.add("col_1");
     expected.add("col_7");
     expected.add("col_11");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with list of cols including meta cols than config
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -476,7 +480,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected.add("col_7");
     expected.add("col_11");
     expected.add("_hoodie_commit_seqno");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with avro schema
     schema = new Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR);
@@ -488,7 +493,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected.add("booleanField");
     expected.add("decimalField");
     expected.add("localTimestampMillisField");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema)), true));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(schema)), true).keySet()));
 
     //test with avro schema and nested fields and unsupported types
     schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_STR);
@@ -499,7 +505,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
     expected.add("firstname");
     expected.add("student.lastname");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema)), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(schema)), false).keySet()));
 
     //test with avro schema with max cols set
     schema = new Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR);
@@ -510,9 +517,11 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
     expected.add("booleanField");
     expected.add("intField");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema)), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(schema)), false).keySet()));
     //test with avro schema with meta cols
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema))), 
false).keySet()));
 
     //test with avro schema with type filter
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -535,10 +544,12 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected.add("current_date");
     expected.add("current_ts");
     expected.add("_hoodie_is_deleted");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA)), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA)), 
false).keySet()));
     //test with avro schema with meta cols
-    assertEquals(expected,
-        HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA))),
 false));
+    assertListEquality(expected,
+        new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+            
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA))),
 false).keySet()));
 
     //test with meta cols disabled
     tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
"false");
@@ -549,7 +560,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     addNColumns(colNames, tableSchemaSize);
     expected = new ArrayList<>();
     addNColumns(expected, tableSchemaSize);
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with meta cols disabled with col list
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -562,7 +574,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected.add("col_1");
     expected.add("col_7");
     expected.add("col_11");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
 
     //test with meta cols disabled with avro schema
     metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -573,7 +586,14 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected.add("booleanField");
     expected.add("decimalField");
     expected.add("localTimestampMillisField");
-    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema)), true));
+    assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
+        Lazy.eagerly(Option.of(schema)), true).keySet()));
+  }
+
+  private void assertListEquality(List<String> expected, List<String> actual) {
+    Collections.sort(expected);
+    Collections.sort(actual);
+    assertEquals(expected, actual);
   }
 
   private void addNColumns(List<String> list, int n) {
@@ -611,19 +631,19 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         .endRecord();
 
     // Test for primitive fields
-    assertTrue(validateDataTypeForPartitionStats("stringField", schema));
-    assertTrue(validateDataTypeForPartitionStats("intField", schema));
-    assertTrue(validateDataTypeForPartitionStats("booleanField", schema));
-    assertTrue(validateDataTypeForPartitionStats("floatField", schema));
-    assertTrue(validateDataTypeForPartitionStats("doubleField", schema));
-    assertTrue(validateDataTypeForPartitionStats("longField", schema));
-    assertTrue(validateDataTypeForPartitionStats("unionIntField", schema));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("stringField").schema(),
 Option.empty()));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("intField").schema(),
 Option.empty()));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("booleanField").schema(),
 Option.empty()));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("floatField").schema(),
 Option.empty()));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("doubleField").schema(),
 Option.empty()));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("longField").schema(),
 Option.empty()));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("unionIntField").schema(),
 Option.empty()));
 
     // Test for unsupported fields
-    assertFalse(validateDataTypeForPartitionStats("arrayField", schema));
-    assertFalse(validateDataTypeForPartitionStats("mapField", schema));
-    assertFalse(validateDataTypeForPartitionStats("structField", schema));
-    assertFalse(validateDataTypeForPartitionStats("bytesField", schema));
+    
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("arrayField").schema(),
 Option.empty()));
+    
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("mapField").schema(),
 Option.empty()));
+    
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("structField").schema(),
 Option.empty()));
+    
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("bytesField").schema(),
 Option.of(HoodieRecord.HoodieRecordType.SPARK)));
 
     // Test for logical types
     Schema dateFieldSchema = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
@@ -631,7 +651,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         .fields()
         .name("dateField").type(dateFieldSchema).noDefault()
         .endRecord();
-    assertTrue(validateDataTypeForPartitionStats("dateField", schema));
+    
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("dateField").schema(),
 Option.empty()));
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index a00a976c551..1523a838139 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -124,10 +124,10 @@ class ColumnStatsIndexSupport(spark: SparkSession,
                         shouldReadInMemory: Boolean,
                         prunedPartitions: Option[Set[String]] = None,
                         prunedFileNamesOpt: Option[Set[String]] = None)(block: 
DataFrame => T): T = {
-    /*cachedColumnStatsIndexViews.get(targetColumns) match {
+    cachedColumnStatsIndexViews.get(targetColumns) match {
       case Some(cachedDF) =>
         block(cachedDF)
-      case None =>*/
+      case None =>
         val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
prunedFileNamesOpt match {
           case Some(prunedFileNames) =>
             val filterFunction = new 
SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] {
@@ -166,7 +166,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
               block(df)
             }
           }
-        //}
+        }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
index fc4e05f0eaa..0d2c5fc24bb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
@@ -52,7 +53,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -67,8 +67,10 @@ import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -78,6 +80,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_RE
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestColStatsRecordWithMetadataRecord extends 
HoodieSparkClientTestHarness {
 
@@ -303,6 +306,151 @@ public class TestColStatsRecordWithMetadataRecord extends 
HoodieSparkClientTestH
     generateNColStatsEntriesAndValidateMerge((Functions.Function1<Random, 
Comparable>) random -> new BigDecimal(String.format(Locale.ENGLISH, "%5f", 
random.nextFloat())));
   }
 
+  @Test
+  public void testGetColumnRangeInPartition() {
+    String relativePartitionPath = "relativePartitionPath";
+    String fileName = "file1";
+    String colName = "colA";
+    long nullCount = 10;
+    long valueCount = 100;
+    long totalSize = 10000;
+    long totalUncompressedSize = 1000;
+
+    // Integer vals
+    HoodieColumnRangeMetadata aIntegerVal = 
HoodieColumnRangeMetadata.create(fileName, colName, (Integer)1, (Integer)1000, 
nullCount, valueCount, totalSize, totalUncompressedSize);
+    HoodieColumnRangeMetadata bIntegerVal = 
HoodieColumnRangeMetadata.create(fileName, colName, (Integer)(-1), 
(Integer)10000, nullCount, valueCount, totalSize, totalUncompressedSize);
+
+    // Long vals
+    HoodieColumnRangeMetadata aLongVal = 
HoodieColumnRangeMetadata.create(fileName, colName, (Long)1L, (Long)1000L, 
nullCount, valueCount, totalSize, totalUncompressedSize);
+    HoodieColumnRangeMetadata bLongVal = 
HoodieColumnRangeMetadata.create(fileName, colName, (Long)(-1L), (Long)10000L, 
nullCount, valueCount, totalSize, totalUncompressedSize);
+
+    // Float vals
+    HoodieColumnRangeMetadata aFloatVal = 
HoodieColumnRangeMetadata.create(fileName, colName, new Float(1), new 
Float(1000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+    HoodieColumnRangeMetadata bFloatVal = 
HoodieColumnRangeMetadata.create(fileName, colName, new Float(-1.0), new 
Float(10000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+
+    // Double vals
+    HoodieColumnRangeMetadata aDoubleVal = 
HoodieColumnRangeMetadata.create(fileName, colName, new Double(0.1), new 
Double(1000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+    HoodieColumnRangeMetadata bDoubleVal = 
HoodieColumnRangeMetadata.create(fileName, colName, new Double(-1.0), new 
Double(10000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+
+    // String vals
+    HoodieColumnRangeMetadata aStringVal = 
HoodieColumnRangeMetadata.create(fileName, colName, new String("1"), new 
String("1000"), nullCount, valueCount, totalSize, totalUncompressedSize);
+    HoodieColumnRangeMetadata bStringVal = 
HoodieColumnRangeMetadata.create(fileName, colName, new String("-1"), new 
String("10000"), nullCount, valueCount, totalSize, totalUncompressedSize);
+
+    // Merging Integer and Integer.
+    HoodieColumnRangeMetadata actualColumnRange = mergeAndAssert(aIntegerVal, 
bIntegerVal, relativePartitionPath, colName, nullCount, totalSize, 
totalUncompressedSize,
+        Schema.Type.INT);
+    assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+    assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+    // Merging Integer and Long.
+    actualColumnRange = mergeAndAssert(aIntegerVal, bLongVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.INT);
+    assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+    assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+    // Merging Integer and Float
+    actualColumnRange = mergeAndAssert(aIntegerVal, bFloatVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.INT);
+    assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+    assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+    // Merging Integer and Double
+    actualColumnRange = mergeAndAssert(aIntegerVal, bDoubleVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.INT);
+    assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+    assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+    // Merging Integer and String
+    actualColumnRange = mergeAndAssert(aIntegerVal, bStringVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.INT);
+    assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+    assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+    // Long and Long
+    actualColumnRange = mergeAndAssert(aLongVal, bLongVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.LONG);
+    assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+    assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+    // Merging Long and Integer
+    actualColumnRange = mergeAndAssert(aLongVal, bIntegerVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.LONG);
+    assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+    assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+    // Merging Long and Float
+    actualColumnRange = mergeAndAssert(aLongVal, bFloatVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.LONG);
+    assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+    assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+    // Merging Long and Double
+    actualColumnRange = mergeAndAssert(aLongVal, bDoubleVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.LONG);
+    assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+    assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+    // Merging Long and String
+    actualColumnRange = mergeAndAssert(aLongVal, bStringVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.LONG);
+    assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+    assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+    // Float and Float
+    actualColumnRange = mergeAndAssert(aFloatVal, bFloatVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.FLOAT);
+    assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+    assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+    // Merging Float and Integer
+    actualColumnRange = mergeAndAssert(aFloatVal, bIntegerVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.FLOAT);
+    assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+    assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+    // Merging Float and Long.
+    actualColumnRange = mergeAndAssert(aFloatVal, bLongVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.FLOAT);
+    assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+    assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+    // Merging Float and String
+    actualColumnRange = mergeAndAssert(aFloatVal, bStringVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.FLOAT);
+    assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+    assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+    // Double and Double
+    actualColumnRange = mergeAndAssert(aDoubleVal, bDoubleVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.DOUBLE);
+    assertEquals(actualColumnRange.getMinValue(), new Double(-1));
+    assertEquals(actualColumnRange.getMaxValue(), new Double(10000));
+
+    // Merging Double and Integer
+    actualColumnRange = mergeAndAssert(aDoubleVal, bIntegerVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.DOUBLE);
+    assertEquals(actualColumnRange.getMinValue(), new Double(-1));
+    assertEquals(actualColumnRange.getMaxValue(), new Double(10000));
+
+    // Merging Double and Long.
+    actualColumnRange = mergeAndAssert(aDoubleVal, bLongVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.DOUBLE);
+    assertEquals(actualColumnRange.getMinValue(), new Double(-1));
+    assertEquals(actualColumnRange.getMaxValue(), new Double(10000));
+
+    // Merging Double and String
+    actualColumnRange = mergeAndAssert(aDoubleVal, bStringVal, 
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize, 
Schema.Type.DOUBLE);
+    assertTrue(actualColumnRange.getMinValue().compareTo(new Double(-1)) == 0);
+    assertTrue(actualColumnRange.getMaxValue().compareTo(new Double(10000)) == 
0);
+  }
+
+  private HoodieColumnRangeMetadata 
mergeAndAssert(HoodieColumnRangeMetadata<Comparable> aVal, 
HoodieColumnRangeMetadata<Comparable> bVal, String relativePartitionPath, 
String colName, long nullCount,
+                              long totalSize, long totalUncompressedSize, 
Schema.Type schemaType) {
+    List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = new 
ArrayList<>();
+    fileColumnRanges.add(aVal);
+    fileColumnRanges.add(bVal);
+    Map<String, Schema> colsToIndexSchemaMap = new HashMap<>();
+    colsToIndexSchemaMap.put(colName, Schema.create(schemaType));
+
+    HoodieColumnRangeMetadata actualColumnRange = 
FileFormatUtils.getColumnRangeInPartition(relativePartitionPath, 
fileColumnRanges, colsToIndexSchemaMap);
+
+    validateColumnRangeMetadata(actualColumnRange, relativePartitionPath, 
colName, nullCount, totalSize, totalUncompressedSize);
+    return actualColumnRange;
+  }
+
+  private void validateColumnRangeMetadata(HoodieColumnRangeMetadata 
actualColumnRange, String relativePartitionPath, String colName, long 
nullCount, long totalSize,
+                                           long totalUncompressedSize) {
+    assertEquals(actualColumnRange.getFilePath(), relativePartitionPath);
+    assertEquals(actualColumnRange.getColumnName(), colName);
+    assertEquals(actualColumnRange.getNullCount(), nullCount * 2);
+    assertEquals(actualColumnRange.getTotalSize(), totalSize * 2);
+    assertEquals(actualColumnRange.getTotalUncompressedSize(), 
totalUncompressedSize * 2);
+  }
+
   private void 
generateNColStatsEntriesAndValidateMerge(Functions.Function1<Random, 
Comparable> randomValueGenFunc) {
     String fileName = "abc.parquet";
     String colName = "colName";
@@ -375,7 +523,7 @@ public class TestColStatsRecordWithMetadataRecord extends 
HoodieSparkClientTestH
     if (doCommit) {
       List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
       boolean committed = client.commitStats(instant, writeStats, 
Option.empty(), metaClient.getCommitActionType());
-      Assertions.assertTrue(committed);
+      assertTrue(committed);
     }
     metaClient = HoodieTableMetaClient.reload(metaClient);
     return writeStatuses;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index 21fc5e621ff..85160db1009 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -280,7 +280,7 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
 
   protected def validateColumnsToIndex(metaClient: HoodieTableMetaClient, 
expectedColsToIndex: Seq[String]): Unit = {
     val indexDefn = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS)
-    assertEquals(expectedColsToIndex, indexDefn.getSourceFields.asScala.toSeq)
+    assertEquals(expectedColsToIndex.sorted, 
indexDefn.getSourceFields.asScala.toSeq.sorted)
   }
 
   protected def validateNonExistantColumnsToIndexDefn(metaClient: 
HoodieTableMetaClient): Unit = {
@@ -301,9 +301,7 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
     val localSourceTableSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
 
     val columnStatsIndex = new ColumnStatsIndexSupport(spark, 
localSourceTableSchema, metadataConfig, metaClient)
-    val lazyOptTableSchema : Lazy[org.apache.hudi.common.util.Option[Schema]] 
= Lazy.eagerly(org.apache.hudi.common.util.Option.of(tableSchema))
-    val indexedColumnswithMeta: Set[String] = HoodieTableMetadataUtil
-      .getColumnsToIndex(metaClient.getTableConfig, metadataConfig, 
lazyOptTableSchema, false).asScala.toSet
+    val indexedColumnswithMeta: Set[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSet
     val indexedColumns = indexedColumnswithMeta.filter(colName => 
!HoodieTableMetadataUtil.META_COL_SET_TO_INDEX.contains(colName))
     val sortedIndexedColumns : Set[String] = TreeSet(indexedColumns.toSeq:_*)
     val (expectedColStatsSchema, _) = 
composeIndexSchema(sortedIndexedColumns.toSeq, indexedColumns.toSeq, 
localSourceTableSchema)
@@ -345,10 +343,7 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
 
     val schemaUtil = new TableSchemaResolver(metaClient)
     val tableSchema = schemaUtil.getTableAvroSchema(false)
-    val localSourceTableSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
-    val lazyOptTableSchema : Lazy[org.apache.hudi.common.util.Option[Schema]] 
= Lazy.eagerly(org.apache.hudi.common.util.Option.of(tableSchema))
-    val indexedColumnswithMeta: Set[String] = HoodieTableMetadataUtil
-      .getColumnsToIndex(metaClient.getTableConfig, metadataConfig, 
lazyOptTableSchema, false).asScala.toSet
+    val indexedColumnswithMeta: Set[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSet
     val pIndexedColumns = indexedColumnswithMeta.filter(colName => 
!HoodieTableMetadataUtil.META_COL_SET_TO_INDEX.contains(colName))
       .toSeq.sorted
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index fc746646de3..179adc37165 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -54,7 +54,7 @@ class TestBasicSchemaEvolution extends 
HoodieSparkClientTestBase with ScalaAsser
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
-    HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false" 
// HUDI-8587
+    HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true"
   )
 
   val verificationCol: String = "driver"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 09bd1238f75..bb75cb168c5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -942,15 +942,9 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       metaClient = HoodieTableMetaClient.reload(metaClient)
       val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build()
       val columnStatsIndex = new ColumnStatsIndexSupport(spark, 
inputDF1.schema, metadataConfig, metaClient)
-      columnStatsIndex.loadTransposed(Seq("fare", "city_to_state", "rider"), 
shouldReadInMemory = true) { emptyTransposedColStatsDF =>
-        // fare is a nested column, so it should not have any min/max value as 
it is not comparable, but still have nullCount
-        assertEquals(0, emptyTransposedColStatsDF.filter("fare_minValue IS NOT 
NULL").count())
-        assertEquals(0, emptyTransposedColStatsDF.filter("fare_maxValue IS NOT 
NULL").count())
-        assertTrue(emptyTransposedColStatsDF.filter("fare_nullCount IS NOT 
NULL").count() > 0)
-        // city_to_state is a map column, so it should not have any min/max 
value as it is not comparable, but still have nullCount
-        assertEquals(0, 
emptyTransposedColStatsDF.filter("city_to_state_minValue IS NOT NULL").count())
-        assertEquals(0, 
emptyTransposedColStatsDF.filter("city_to_state_maxValue IS NOT NULL").count())
-        assertTrue(emptyTransposedColStatsDF.filter("city_to_state_nullCount 
IS NOT NULL").count() > 0)
+      columnStatsIndex.loadTransposed(Seq("fare","city_to_state", "rider"), 
shouldReadInMemory = true) { emptyTransposedColStatsDF =>
+        assertTrue(!emptyTransposedColStatsDF.columns.contains("fare"))
+        
assertTrue(!emptyTransposedColStatsDF.columns.contains("city_to_state"))
         // rider is a simple string field, so it should have a min/max value 
as well as nullCount
         assertTrue(emptyTransposedColStatsDF.filter("rider_minValue IS NOT 
NULL").count() > 0)
         assertTrue(emptyTransposedColStatsDF.filter("rider_maxValue IS NOT 
NULL").count() > 0)
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 6947611e921..d8d94ec34aa 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -25,7 +25,6 @@ import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.TestHoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -156,7 +155,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
 
   protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] 
transformerClasses, boolean nullForDeletedCols,
                                                               TypedProperties 
extraProps) throws IOException {
-    
extraProps.setProperty(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),"false");
 // HUDI-8587
 
     extraProps.setProperty("hoodie.datasource.write.table.type", tableType);
     extraProps.setProperty("hoodie.datasource.write.row.writer.enable", 
rowWriterEnable.toString());

Reply via email to