This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 898eb1c98db [HUDI-8556] Add config for max number of colstats columns 
(#12310)
898eb1c98db is described below

commit 898eb1c98db6740931de94c06c8377ad449dd69c
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Nov 29 02:49:44 2024 -0500

    [HUDI-8556] Add config for max number of colstats columns (#12310)
    
    - Adding config to control the number of columns to index with col stats.
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 -
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  22 +--
 .../metadata/HoodieBackedTableMetadataWriter.java  |  21 +--
 .../hudi/common/config/HoodieMetadataConfig.java   |  17 ++
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 168 +++++++++++-------
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |   2 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java | 193 ++++++++++++++++++++-
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  17 +-
 .../TestDataSkippingWithMORColstats.java           |   4 +
 .../column-stats-index-table-short-schema.json     |   4 +
 ...ted2-column-stats-index-table-short-schema.json |  13 ++
 ...ted2-column-stats-index-table-short-schema.json |  13 ++
 ...ated-column-stats-index-table-short-schema.json |   8 +
 .../hudi/functional/ColumnStatIndexTestBase.scala  |  27 +--
 .../hudi/functional/TestColumnStatsIndex.scala     |  26 ++-
 .../functional/TestColumnStatsIndexWithSQL.scala   |  44 ++++-
 .../apache/hudi/functional/TestMORDataSource.scala |   3 +-
 .../hudi/dml/TestHoodieTableValuedFunction.scala   |   6 +-
 .../utilities/HoodieMetadataTableValidator.java    |   3 +-
 19 files changed, 448 insertions(+), 147 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0dfcc66833b..8af086c560d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2046,10 +2046,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     return isMetadataTableEnabled() && 
getMetadataConfig().isColumnStatsIndexEnabled();
   }
 
-  public List<String> getColumnsEnabledForColumnStatsIndex() {
-    return getMetadataConfig().getColumnsEnabledForColumnStatsIndex();
-  }
-
   public boolean isPartitionStatsIndexEnabled() {
     return isMetadataTableEnabled() && 
getMetadataConfig().isPartitionStatsIndexEnabled();
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 1891ec237a8..46dc9ad55bc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -57,8 +57,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieAppendException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.Lazy;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -432,19 +434,13 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     }
 
     if (config.isMetadataColumnStatsIndexEnabled()) {
-      final List<Schema.Field> fieldsToIndex;
-      // If column stats index is enabled but columns not configured then we 
assume that
-      // all columns should be indexed
-      if (config.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
-        fieldsToIndex = writeSchemaWithMetaFields.getFields();
-      } else {
-        Set<String> columnsToIndexSet = new 
HashSet<>(config.getColumnsEnabledForColumnStatsIndex());
-
-        fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
-            .filter(field -> columnsToIndexSet.contains(field.name()))
-            .collect(Collectors.toList());
-      }
-
+      Set<String> columnsToIndexSet = new HashSet<>(HoodieTableMetadataUtil
+          .getColumnsToIndex(hoodieTable.getMetaClient().getTableConfig(),
+              config.getMetadataConfig(), 
Lazy.eagerly(Option.of(writeSchemaWithMetaFields)),
+              Option.of(this.recordMerger.getRecordType())));
+      final List<Schema.Field> fieldsToIndex = 
writeSchemaWithMetaFields.getFields().stream()
+          .filter(field -> columnsToIndexSet.contains(field.name()))
+          .collect(Collectors.toList());
       try {
         Map<String, HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataMap =
             collectColumnRangeMetadata(recordList, fieldsToIndex, 
stat.getPath(), writeSchemaWithMetaFields);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 71730c676d8..34ebdea906c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -512,8 +512,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
     // during initialization, we need stats for base and log files.
     HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-        engineContext, Collections.emptyMap(), partitionToFilesMap, 
dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-        dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
dataMetaClient, dataWriteConfig.getMetadataConfig(),
+        dataWriteConfig.getColumnStatsIndexParallelism(),
         dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());
 
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
@@ -1050,10 +1050,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
               engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
+              dataWriteConfig.getMetadataConfig(),
               enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
-              dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getWritesFileIdEncoding(),
-              dataWriteConfig.getMetadataConfig());
+              dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.getWritesFileIdEncoding());
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
@@ -1073,11 +1072,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     processAndCommit(instantTime, () -> {
       Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
-              engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
-              enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
-              dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
-              dataWriteConfig.getWritesFileIdEncoding(), 
dataWriteConfig.getMetadataConfig());
+              engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient, dataWriteConfig.getMetadataConfig(),
+              enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), 
dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.getWritesFileIdEncoding());
       HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(records, commitMetadata);
       partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
records.union(additionalUpdates));
       updateExpressionIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
@@ -1213,9 +1209,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   @Override
   public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
-        cleanMetadata, instantTime, dataMetaClient, enabledPartitionTypes,
-        dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-        dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex()));
+        cleanMetadata, instantTime, dataMetaClient, 
dataWriteConfig.getMetadataConfig(), enabledPartitionTypes,
+        dataWriteConfig.getBloomIndexParallelism()));
     closeInternal();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index ebe96d4d597..603b0837994 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -173,6 +173,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("0.11.0")
       .withDocumentation("Comma-separated list of columns for which column 
stats index will be built. If not set, all columns will be indexed");
 
+  public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_MAX_COLUMNS = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.max.columns.to.index")
+      .defaultValue(32)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Maximum number of columns to generate column stats 
for. If the config `" + COLUMN_STATS_INDEX_FOR_COLUMNS.key() + "` is set then 
then this config will be ignored."
+       + "If unset, then column stats will be generated for the first n 
columns in the table schema");
+
   public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY = 
"in-memory";
   public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE = 
"engine";
 
@@ -413,6 +421,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), 
CONFIG_VALUES_DELIMITER);
   }
 
+  public Integer maxColumnsToIndexForColStats() {
+    return getIntOrDefault(COLUMN_STATS_INDEX_MAX_COLUMNS);
+  }
+
   public String getColumnStatsIndexProcessingModeOverride() {
     return getString(COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE);
   }
@@ -595,6 +607,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withMaxColumnsToIndexForColStats(int maxCols) {
+      metadataConfig.setValue(COLUMN_STATS_INDEX_MAX_COLUMNS, 
String.valueOf(maxCols));
+      return this;
+    }
+
     public Builder withBloomFilterIndexForColumns(String columns) {
       metadataConfig.setValue(BLOOM_FILTER_INDEX_FOR_COLUMNS, columns);
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 4899a68ee7f..12eaa6c5732 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -81,6 +81,7 @@ import org.apache.hudi.common.table.timeline.TimelineFactory;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -153,6 +154,10 @@ import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_CO
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
 import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
+import static 
org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
+import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
@@ -245,7 +250,7 @@ public class HoodieTableMetadataUtil {
         }
 
         colStats.valueCount++;
-        if (fieldValue != null && canCompare(fieldSchema, 
record.getRecordType())) {
+        if (fieldValue != null && isColumnTypeSupported(fieldSchema, 
Option.of(record.getRecordType()))) {
           // Set the min value of the field
           if (colStats.minValue == null
               || ConvertingGenericData.INSTANCE.compare(fieldValue, 
colStats.minValue, fieldSchema) < 0) {
@@ -351,12 +356,10 @@ public class HoodieTableMetadataUtil {
    * @param commitMetadata                   - Commit action metadata
    * @param instantTime                      - Action instant time
    * @param dataMetaClient                   - HoodieTableMetaClient for data
+   * @param metadataConfig                   - HoodieMetadataConfig
    * @param enabledPartitionTypes            - List of enabled MDT partitions
    * @param bloomFilterType                  - Type of generated bloom filter 
records
    * @param bloomIndexParallelism            - Parallelism for bloom filter 
record generation
-   * @param isColumnStatsIndexEnabled        - Is column stats index enabled
-   * @param columnStatsIndexParallelism      - Parallelism for column stats 
index records generation
-   * @param targetColumnsForColumnStatsIndex - List of columns for column 
stats index
    * @return Map of partition to metadata records for the commit action
    */
   public static Map<String, HoodieData<HoodieRecord>> 
convertMetadataToRecords(HoodieEngineContext context,
@@ -364,14 +367,10 @@ public class HoodieTableMetadataUtil {
                                                                                
HoodieCommitMetadata commitMetadata,
                                                                                
String instantTime,
                                                                                
HoodieTableMetaClient dataMetaClient,
+                                                                               
HoodieMetadataConfig metadataConfig,
                                                                                
List<MetadataPartitionType> enabledPartitionTypes,
                                                                                
String bloomFilterType,
-                                                                               
int bloomIndexParallelism,
-                                                                               
boolean isColumnStatsIndexEnabled,
-                                                                               
int columnStatsIndexParallelism,
-                                                                               
List<String> targetColumnsForColumnStatsIndex,
-                                                                               
Integer writesFileIdEncoding,
-                                                                               
HoodieMetadataConfig metadataConfig) {
+                                                                               
int bloomIndexParallelism, Integer writesFileIdEncoding) {
     final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new 
HashMap<>();
     final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
context.parallelize(
         convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 
1);
@@ -385,7 +384,7 @@ public class HoodieTableMetadataUtil {
 
     if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
       final HoodieData<HoodieRecord> metadataColumnStatsRDD = 
convertMetadataToColumnStatsRecords(commitMetadata, context,
-          dataMetaClient, isColumnStatsIndexEnabled, 
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
+          dataMetaClient, metadataConfig);
       
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
 metadataColumnStatsRDD);
     }
     if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS)) 
{
@@ -562,16 +561,13 @@ public class HoodieTableMetadataUtil {
                                                                                
HoodieCleanMetadata cleanMetadata,
                                                                                
String instantTime,
                                                                                
HoodieTableMetaClient dataMetaClient,
+                                                                               
HoodieMetadataConfig metadataConfig,
                                                                                
List<MetadataPartitionType> enabledPartitionTypes,
-                                                                               
int bloomIndexParallelism,
-                                                                               
boolean isColumnStatsIndexEnabled,
-                                                                               
int columnStatsIndexParallelism,
-                                                                               
List<String> targetColumnsForColumnStatsIndex) {
+                                                                               
int bloomIndexParallelism) {
     final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new 
HashMap<>();
     final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
engineContext.parallelize(
         convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
     partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(), 
filesPartitionRecordsRDD);
-
     if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
       final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
           convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, 
instantTime, bloomIndexParallelism);
@@ -581,11 +577,11 @@ public class HoodieTableMetadataUtil {
     if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
       final HoodieData<HoodieRecord> metadataColumnStatsRDD =
           convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
-              dataMetaClient, isColumnStatsIndexEnabled, 
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
+              dataMetaClient, metadataConfig);
       
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
 metadataColumnStatsRDD);
     }
     if 
(enabledPartitionTypes.contains(MetadataPartitionType.EXPRESSION_INDEX)) {
-      convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata, 
instantTime, dataMetaClient, bloomIndexParallelism, 
columnStatsIndexParallelism, partitionToRecordsMap);
+      convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata, 
instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism, 
partitionToRecordsMap);
     }
 
     return partitionToRecordsMap;
@@ -593,7 +589,7 @@ public class HoodieTableMetadataUtil {
 
   private static void 
convertMetadataToExpressionIndexRecords(HoodieEngineContext engineContext, 
HoodieCleanMetadata cleanMetadata,
                                                               String 
instantTime, HoodieTableMetaClient dataMetaClient,
-                                                              int 
bloomIndexParallelism, int columnStatsIndexParallelism,
+                                                              
HoodieMetadataConfig metadataConfig, int bloomIndexParallelism,
                                                               Map<String, 
HoodieData<HoodieRecord>> partitionToRecordsMap) {
     Option<HoodieIndexMetadata> indexMetadata = 
dataMetaClient.getIndexMetadata();
     if (indexMetadata.isPresent()) {
@@ -613,8 +609,12 @@ public class HoodieTableMetadataUtil {
           if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
             partitionToRecordsMap.put(indexName, 
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, 
bloomIndexParallelism));
           } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+            HoodieMetadataConfig modifiedMetadataConfig = 
HoodieMetadataConfig.newBuilder()
+                .withProperties(metadataConfig.getProps())
+                .withColumnStatsIndexForColumns(String.join(",", 
indexDefinition.getSourceFields()))
+                .build();
             partitionToRecordsMap.put(indexName,
-                convertMetadataToColumnStatsRecords(cleanMetadata, 
engineContext, dataMetaClient, true, columnStatsIndexParallelism, 
indexDefinition.getSourceFields()));
+                convertMetadataToColumnStatsRecords(cleanMetadata, 
engineContext, dataMetaClient, modifiedMetadataConfig));
           } else {
             throw new HoodieMetadataException("Unsupported expression index 
type");
           }
@@ -731,17 +731,13 @@ public class HoodieTableMetadataUtil {
    * @param cleanMetadata                    - Clean action metadata
    * @param engineContext                    - Engine context
    * @param dataMetaClient                   - HoodieTableMetaClient for data
-   * @param isColumnStatsIndexEnabled        - Is column stats index enabled
-   * @param columnStatsIndexParallelism      - Parallelism for column stats 
index records generation
-   * @param targetColumnsForColumnStatsIndex - List of columns for column 
stats index
+   * @param metadataConfig                   - HoodieMetadataConfig
    * @return List of column stats index records for the clean metadata
    */
   public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
                                                                              
HoodieEngineContext engineContext,
                                                                              
HoodieTableMetaClient dataMetaClient,
-                                                                             
boolean isColumnStatsIndexEnabled,
-                                                                             
int columnStatsIndexParallelism,
-                                                                             
List<String> targetColumnsForColumnStatsIndex) {
+                                                                             
HoodieMetadataConfig metadataConfig) {
     List<Pair<String, String>> deleteFileList = new ArrayList<>();
     cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
       // Files deleted from a partition
@@ -749,9 +745,8 @@ public class HoodieTableMetadataUtil {
       deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, 
entry)));
     });
 
-    List<String> columnsToIndex =
-        getColumnsToIndex(isColumnStatsIndexEnabled, 
targetColumnsForColumnStatsIndex,
-            Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
+    List<String> columnsToIndex = 
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
 
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
@@ -759,7 +754,7 @@ public class HoodieTableMetadataUtil {
       return engineContext.emptyHoodieData();
     }
 
-    int parallelism = Math.max(Math.min(deleteFileList.size(), 
columnStatsIndexParallelism), 1);
+    int parallelism = Math.max(Math.min(deleteFileList.size(), 
metadataConfig.getColumnStatsIndexParallelism()), 1);
     return engineContext.parallelize(deleteFileList, parallelism)
         .flatMap(deleteFileInfoPair -> {
           String partitionPath = deleteFileInfoPair.getLeft();
@@ -1117,14 +1112,12 @@ public class HoodieTableMetadataUtil {
                                                                           
Map<String, List<String>> partitionToDeletedFiles,
                                                                           
Map<String, Map<String, Long>> partitionToAppendedFiles,
                                                                           
HoodieTableMetaClient dataMetaClient,
-                                                                          
boolean isColumnStatsIndexEnabled,
+                                                                          
HoodieMetadataConfig metadataConfig,
                                                                           int 
columnStatsIndexParallelism,
-                                                                          
List<String> targetColumnsForColumnStatsIndex,
                                                                           int 
maxReaderBufferSize) {
     // Find the columns to index
-    final List<String> columnsToIndex =
-        getColumnsToIndex(isColumnStatsIndexEnabled, 
targetColumnsForColumnStatsIndex,
-            Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
+    final List<String> columnsToIndex = 
getColumnsToIndex(dataMetaClient.getTableConfig(),
+        metadataConfig, Lazy.lazily(() -> 
tryResolveSchemaForTable(dataMetaClient)));
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
       LOG.warn("No columns to index for column stats index.");
@@ -1312,9 +1305,7 @@ public class HoodieTableMetadataUtil {
   public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
                                                                              
HoodieEngineContext engineContext,
                                                                              
HoodieTableMetaClient dataMetaClient,
-                                                                             
boolean isColumnStatsIndexEnabled,
-                                                                             
int columnStatsIndexParallelism,
-                                                                             
List<String> targetColumnsForColumnStatsIndex) {
+                                                                             
HoodieMetadataConfig metadataConfig) {
     List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
         .flatMap(Collection::stream).collect(Collectors.toList());
 
@@ -1336,7 +1327,7 @@ public class HoodieTableMetadataUtil {
       Option<Schema> tableSchema = writerSchema.map(schema ->
           tableConfig.populateMetaFields() ? addMetadataFields(schema) : 
schema);
 
-      List<String> columnsToIndex = 
getColumnsToIndex(isColumnStatsIndexEnabled, targetColumnsForColumnStatsIndex,
+      List<String> columnsToIndex = 
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
           Lazy.eagerly(tableSchema));
 
       if (columnsToIndex.isEmpty()) {
@@ -1344,7 +1335,7 @@ public class HoodieTableMetadataUtil {
         return engineContext.emptyHoodieData();
       }
 
-      int parallelism = Math.max(Math.min(allWriteStats.size(), 
columnStatsIndexParallelism), 1);
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getColumnStatsIndexParallelism()), 1);
       return engineContext.parallelize(allWriteStats, parallelism)
           .flatMap(writeStat ->
               translateWriteStatToColumnStats(writeStat, dataMetaClient, 
columnsToIndex).iterator());
@@ -1353,25 +1344,74 @@ public class HoodieTableMetadataUtil {
     }
   }
 
+  @VisibleForTesting
+  static final String[] META_COLS_TO_ALWAYS_INDEX = 
{COMMIT_TIME_METADATA_FIELD, RECORD_KEY_METADATA_FIELD, 
PARTITION_PATH_METADATA_FIELD};
+  @VisibleForTesting
+  public static final Set<String> META_COL_SET_TO_INDEX = new 
HashSet<>(Arrays.asList(META_COLS_TO_ALWAYS_INDEX));
+
+  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+                                               HoodieMetadataConfig 
metadataConfig,
+                                               List<String> columnNames) {
+    return getColumnsToIndex(tableConfig, metadataConfig, 
Either.left(columnNames), Option.empty());
+  }
+
+  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+                                               HoodieMetadataConfig 
metadataConfig,
+                                               Lazy<Option<Schema>> 
tableSchema,
+                                               Option<HoodieRecordType> 
recordType) {
+    return getColumnsToIndex(tableConfig, metadataConfig, 
Either.right(tableSchema), recordType);
+  }
+
+  public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+                                               HoodieMetadataConfig 
metadataConfig,
+                                               Lazy<Option<Schema>> 
tableSchema) {
+    return getColumnsToIndex(tableConfig, metadataConfig, 
Either.right(tableSchema), Option.empty());
+  }
+
+  private static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+                                                HoodieMetadataConfig 
metadataConfig,
+                                                Either<List<String>, 
Lazy<Option<Schema>>> tableSchema,
+                                                Option<HoodieRecordType> 
recordType) {
+    Stream<String> columnsToIndexWithoutRequiredMetas = 
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchema, 
recordType);
+    if (!tableConfig.populateMetaFields()) {
+      return columnsToIndexWithoutRequiredMetas.collect(Collectors.toList());
+    }
+
+    return Stream.concat(Arrays.stream(META_COLS_TO_ALWAYS_INDEX), 
columnsToIndexWithoutRequiredMetas).collect(Collectors.toList());
+  }
+
   /**
-   * Get the list of columns for the table for column stats indexing
+   * Get list of columns that should be indexed for col stats or partition 
stats
+   * We always index META_COLS_TO_ALWAYS_INDEX If 
metadataConfig.getColumnsEnabledForColumnStatsIndex()
+   * is empty, we will use metadataConfig.maxColumnsToIndexForColStats() and 
index the first n columns in the table in addition to the
+   * required meta cols
+   *
+   * @param metadataConfig       metadata config
+   * @param tableSchema          either a list of the columns in the table, or 
a lazy option of the table schema
+   * @param recordType           Option of record type. Used to determine 
which types are valid to index
+   * @return list of columns that should be indexed
    */
-  private static List<String> getColumnsToIndex(boolean 
isColumnStatsIndexEnabled,
-                                                List<String> 
targetColumnsForColumnStatsIndex,
-                                                Lazy<Option<Schema>> 
lazyWriterSchemaOpt) {
-    checkState(isColumnStatsIndexEnabled);
-
-    if (!targetColumnsForColumnStatsIndex.isEmpty()) {
-      return targetColumnsForColumnStatsIndex;
+  private static Stream<String> 
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
+                                                                           
Either<List<String>, Lazy<Option<Schema>>> tableSchema,
+                                                                           
Option<HoodieRecordType> recordType) {
+    List<String> columnsToIndex = 
metadataConfig.getColumnsEnabledForColumnStatsIndex();
+    if (!columnsToIndex.isEmpty()) {
+      return columnsToIndex.stream().filter(fieldName -> 
!META_COL_SET_TO_INDEX.contains(fieldName));
+    }
+    if (tableSchema.isLeft()) {
+      return getFirstNFieldNames(tableSchema.asLeft().stream(), 
metadataConfig.maxColumnsToIndexForColStats());
+    } else {
+      return tableSchema.asRight().get().map(schema -> 
getFirstNFieldNames(schema, metadataConfig.maxColumnsToIndexForColStats(), 
recordType)).orElse(Stream.empty());
     }
+  }
 
-    Option<Schema> writerSchemaOpt = lazyWriterSchemaOpt.get();
-    return writerSchemaOpt
-        .map(writerSchema ->
-            writerSchema.getFields().stream()
-                .map(Schema.Field::name)
-                .collect(Collectors.toList()))
-        .orElse(Collections.emptyList());
+  private static Stream<String> getFirstNFieldNames(Schema tableSchema, int n, 
Option<HoodieRecordType> recordType) {
+    return getFirstNFieldNames(tableSchema.getFields().stream()
+        .filter(field -> isColumnTypeSupported(field.schema(), 
recordType)).map(Schema.Field::name), n);
+  }
+
+  private static Stream<String> getFirstNFieldNames(Stream<String> fieldNames, 
int n) {
+    return fieldNames.filter(fieldName -> 
!HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName)).limit(n);
   }
 
   private static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
@@ -1619,12 +1659,13 @@ public class HoodieTableMetadataUtil {
     }
   }
 
-  private static boolean canCompare(Schema schema, HoodieRecordType 
recordType) {
-    // if recordType is SPARK then we cannot compare RECORD and ARRAY types in 
addition to MAP type
-    if (recordType == HoodieRecordType.SPARK) {
-      return schema.getType() != Schema.Type.RECORD && schema.getType() != 
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
+  private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
+    // if record type is set and if its AVRO, MAP is unsupported.
+    if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
+      return schema.getType() != Schema.Type.MAP;
     }
-    return schema.getType() != Schema.Type.MAP;
+    // if record Type is not set or if recordType is SPARK then we cannot 
compare RECORD and ARRAY types in addition to MAP type
+    return schema.getType() != Schema.Type.RECORD && schema.getType() != 
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
   }
 
   public static Set<String> getInflightMetadataPartitions(HoodieTableConfig 
tableConfig) {
@@ -2381,10 +2422,7 @@ public class HoodieTableMetadataUtil {
                                                                              
HoodieTableMetaClient dataTableMetaClient,
                                                                              
Option<Schema> writerSchemaOpt) {
     Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ? 
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() -> 
tryResolveSchemaForTable(dataTableMetaClient));
-    final List<String> columnsToIndex = getColumnsToIndex(
-        metadataConfig.isPartitionStatsIndexEnabled(),
-        metadataConfig.getColumnsEnabledForColumnStatsIndex(),
-        lazyWriterSchemaOpt);
+    final List<String> columnsToIndex = 
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig, 
lazyWriterSchemaOpt);
     if (columnsToIndex.isEmpty()) {
       LOG.warn("No columns to index for partition stats index");
       return engineContext.emptyHoodieData();
@@ -2456,7 +2494,7 @@ public class HoodieTableMetadataUtil {
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
       Option<Schema> tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
       Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
-      List<String> columnsToIndex = 
getColumnsToIndex(metadataConfig.isPartitionStatsIndexEnabled(), 
metadataConfig.getColumnsEnabledForColumnStatsIndex(), writerSchemaOpt);
+      List<String> columnsToIndex = 
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, 
writerSchemaOpt);
       if (columnsToIndex.isEmpty()) {
         return engineContext.emptyHoodieData();
       }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 776fd6bbf0b..662600f64e1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -155,7 +155,7 @@ public class TestHoodieAvroUtils {
       + 
"{\"name\":\"ss\",\"type\":{\"name\":\"ss\",\"type\":\"record\",\"fields\":["
       + "{\"name\":\"fn\",\"type\":[\"null\" ,\"string\"],\"default\": 
null},{\"name\":\"ln\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";
 
-  private static String SCHEMA_WITH_AVRO_TYPES = 
"{\"name\":\"TestRecordAvroTypes\",\"type\":\"record\",\"fields\":["
+  public static final String SCHEMA_WITH_AVRO_TYPES = 
"{\"name\":\"TestRecordAvroTypes\",\"type\":\"record\",\"fields\":["
       // Primitive types
       + "{\"name\":\"booleanField\",\"type\":\"boolean\"},"
       + "{\"name\":\"intField\",\"type\":\"int\"},"
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 30c427ae28e..daf89e47c72 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
@@ -41,6 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.Lazy;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -60,6 +62,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.avro.TestHoodieAvroUtils.SCHEMA_WITH_AVRO_TYPES;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileIDForFileGroup;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryIndex;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -270,13 +273,17 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         metaClient,
         Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
     // Validate the result.
-    validatePartitionStats(result, instant1, instant2);
+    validatePartitionStats(result, instant1, instant2, 6);
   }
 
   private static void validatePartitionStats(HoodieData<HoodieRecord> result, 
String instant1, String instant2) {
+    validatePartitionStats(result, instant1, instant2, 15);
+  }
+
+  private static void validatePartitionStats(HoodieData<HoodieRecord> result, 
String instant1, String instant2, int expectedTotalRecords) {
     List<HoodieRecord> records = result.collectAsList();
-    // 3 partitions * 2 columns = 6 partition stats records
-    assertEquals(6, records.size());
+    // 3 partitions * (2 + 3) columns = 15 partition stats records. 3 meta 
fields are indexed by default.
+    assertEquals(expectedTotalRecords, records.size());
     assertEquals(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), 
records.get(0).getPartitionPath());
     ((HoodieMetadataPayload) 
result.collectAsList().get(0).getData()).getColumnStatMetadata().get().getColumnName();
     records.forEach(r -> {
@@ -355,4 +362,184 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     // Test for complex fields
     assertFalse(validateDataTypeForSecondaryIndex(Arrays.asList("arrayField", 
"mapField", "structField"), schema));
   }
+
+  @Test
+  public void testGetColumnsToIndex() {
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+
+    //test default
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .build();
+    List<String> colNames = new ArrayList<>();
+    addNColumns(colNames, 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
+    List<String> expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    addNColumns(expected, 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue());
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with table schema < default
+    int tableSchemaSize = 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() - 10;
+    assertTrue(tableSchemaSize > 0);
+    colNames = new ArrayList<>();
+    addNColumns(colNames, tableSchemaSize);
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    addNColumns(expected, tableSchemaSize);
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with max val < tableSchema
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .withMaxColumnsToIndexForColStats(3)
+        .build();
+    colNames = new ArrayList<>();
+    addNColumns(colNames, 
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    addNColumns(expected, 3);
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with max val > tableSchema
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        
.withMaxColumnsToIndexForColStats(HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue()
 + 10)
+        .build();
+    colNames = new ArrayList<>();
+    addNColumns(colNames, tableSchemaSize);
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    addNColumns(expected, tableSchemaSize);
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with list of cols
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .withColumnStatsIndexForColumns("col_1,col_7,col_11")
+        .build();
+    colNames = new ArrayList<>();
+    addNColumns(colNames, 15);
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    expected.add("col_1");
+    expected.add("col_7");
+    expected.add("col_11");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with list of cols longer than config
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .withMaxColumnsToIndexForColStats(1)
+        .withColumnStatsIndexForColumns("col_1,col_7,col_11")
+        .build();
+    colNames = new ArrayList<>();
+    addNColumns(colNames, 15);
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    expected.add("col_1");
+    expected.add("col_7");
+    expected.add("col_11");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with list of cols including meta cols than config
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        
.withColumnStatsIndexForColumns("col_1,col_7,_hoodie_commit_time,col_11,_hoodie_commit_seqno")
+        .build();
+    colNames = new ArrayList<>();
+    addNColumns(colNames, 15);
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    expected.add("col_1");
+    expected.add("col_7");
+    expected.add("col_11");
+    expected.add("_hoodie_commit_seqno");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with avro schema
+    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES);
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        
.withColumnStatsIndexForColumns("booleanField,decimalField,localTimestampMillisField")
+        .build();
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    expected.add("booleanField");
+    expected.add("decimalField");
+    expected.add("localTimestampMillisField");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema))));
+
+    //test with avro schema with max cols set
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .withMaxColumnsToIndexForColStats(2)
+        .build();
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    expected.add("booleanField");
+    expected.add("intField");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema))));
+    //test with avro schema with meta cols
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema)))));
+
+    //test with avro schema with type filter
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .withMaxColumnsToIndexForColStats(100)
+        .build();
+    expected = new 
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+    expected.add("timestamp");
+    expected.add("_row_key");
+    expected.add("partition_path");
+    expected.add("trip_type");
+    expected.add("rider");
+    expected.add("driver");
+    expected.add("begin_lat");
+    expected.add("begin_lon");
+    expected.add("end_lat");
+    expected.add("end_lon");
+    expected.add("distance_in_meters");
+    expected.add("seconds_since_epoch");
+    expected.add("weight");
+    expected.add("nation");
+    expected.add("current_date");
+    expected.add("current_ts");
+    expected.add("height");
+    expected.add("_hoodie_is_deleted");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA))));
+    //test with avro schema with meta cols
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA)))));
+
+    //test with meta cols disabled
+    tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
"false");
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .build();
+    colNames = new ArrayList<>();
+    addNColumns(colNames, tableSchemaSize);
+    expected = new ArrayList<>();
+    addNColumns(expected, tableSchemaSize);
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with meta cols disabled with col list
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        .withColumnStatsIndexForColumns("col_1,col_7,col_11")
+        .build();
+    colNames = new ArrayList<>();
+    addNColumns(colNames, 15);
+    expected = new ArrayList<>();
+    expected.add("col_1");
+    expected.add("col_7");
+    expected.add("col_11");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
colNames));
+
+    //test with meta cols disabled with avro schema
+    metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true).withMetadataIndexColumnStats(true)
+        
.withColumnStatsIndexForColumns("booleanField,decimalField,localTimestampMillisField")
+        .build();
+    expected = new ArrayList<>();
+    expected.add("booleanField");
+    expected.add("decimalField");
+    expected.add("localTimestampMillisField");
+    assertEquals(expected, 
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, 
Lazy.eagerly(Option.of(schema))));
+  }
+
+  private void addNColumns(List<String> list, int n) {
+    for (int i = 0; i < n; i++) {
+      list.add("col_" + i);
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 7c822ef2593..3eaa59d6b80 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -33,6 +33,8 @@ import org.apache.hudi.common.util.hash.{ColumnIndexID, 
PartitionIndexID}
 import org.apache.hudi.data.HoodieJavaRDD
 import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.util.JFunction
+import org.apache.hudi.util.JavaScalaConverters.convertScalaListToJavaList
+
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.generic.GenericData
 import 
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, 
createDataFrameFromRDD, createDataFrameFromRows}
@@ -45,6 +47,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.storage.StorageLevel
 
 import java.nio.ByteBuffer
+
 import scala.collection.JavaConverters._
 import scala.collection.immutable.TreeSet
 import scala.collection.mutable.ListBuffer
@@ -63,17 +66,9 @@ class ColumnStatsIndexSupport(spark: SparkSession,
   //       on to the executor
   protected val inMemoryProjectionThreshold = 
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
 
-  private lazy val indexedColumns: Set[String] = {
-    val customIndexedColumns = 
metadataConfig.getColumnsEnabledForColumnStatsIndex
-    // Column Stats Index could index either
-    //    - The whole table
-    //    - Only configured columns
-    if (customIndexedColumns.isEmpty) {
-      tableSchema.fieldNames.toSet
-    } else {
-      customIndexedColumns.asScala.toSet
-    }
-  }
+  private lazy val indexedColumns: Set[String] = HoodieTableMetadataUtil
+    .getColumnsToIndex(metaClient.getTableConfig, metadataConfig, 
convertScalaListToJavaList(tableSchema.fieldNames)).asScala.toSet
+
 
   override def getIndexName: String = ColumnStatsIndexSupport.INDEX_NAME
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
index e22a91c525f..e5fe0a3ad00 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
@@ -114,6 +114,10 @@ public class TestDataSkippingWithMORColstats extends 
HoodieSparkClientTestBase {
    */
   @Test
   public void testBaseFileOnly() {
+    // note that this config is here just to test that it does nothing since
+    // we have specified which columns to index
+    options.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.key(), 
"1");
+
     
options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), 
"false");
     Dataset<Row> inserts = makeInsertDf("000", 100);
     Dataset<Row> batch1 = inserts.where(matchCond);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table-short-schema.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..446c9df217e
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table-short-schema.json
@@ -0,0 +1,4 @@
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 
769sdc","c2_minValue":" 
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
932sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,
 "valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 
943sdc","c2_minValue":" 
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,
 "valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,
 "valueCount":13}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-updated2-column-stats-index-table-short-schema.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-updated2-column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..a421adfb5fc
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-updated2-column-stats-index-table-short-schema.json
@@ -0,0 +1,13 @@
+{"c1_maxValue":101,"c1_minValue":101,"c1_nullCount":0,"c2_maxValue":" 
999sdc","c2_minValue":" 
999sdc","c2_nullCount":0,"c3_maxValue":10.329,"c3_minValue":10.329,"c3_nullCount":0,"valueCount":1}
+{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_maxValue":" 
8sdc","c2_minValue":" 
111sdc","c2_nullCount":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_nullCount":0,"valueCount":15}
+{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_maxValue":" 
76sdc","c2_minValue":" 
224sdc","c2_nullCount":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_nullCount":0,"valueCount":12}
+{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_maxValue":" 
768sdc","c2_minValue":" 
118sdc","c2_nullCount":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_nullCount":0,"valueCount":7}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 
769sdc","c2_minValue":" 
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 
985sdc","c2_minValue":" 
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_maxValue":" 
770sdc","c2_minValue":" 
129sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":6}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
932sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
987sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 
943sdc","c2_minValue":" 
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 
984sdc","c2_minValue":" 
200sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":1,"valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 
989sdc","c2_minValue":" 
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-updated2-column-stats-index-table-short-schema.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-updated2-column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..382ba58abf0
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-updated2-column-stats-index-table-short-schema.json
@@ -0,0 +1,13 @@
+{"c1_maxValue":101,"c1_minValue":101,"c1_nullCount":0,"c2_maxValue":" 
999sdc","c2_minValue":" 
999sdc","c2_nullCount":0,"c3_maxValue":10.329,"c3_minValue":10.329,"c3_nullCount":0,"valueCount":1}
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 
984sdc","c2_minValue":" 
980sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":1,"valueCount":4}
+{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_maxValue":" 
8sdc","c2_minValue":" 
111sdc","c2_nullCount":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_nullCount":0,"valueCount":15}
+{"c1_maxValue":619,"c1_minValue":619,"c1_nullCount":0,"c2_maxValue":" 
985sdc","c2_minValue":" 
985sdc","c2_nullCount":0,"c3_maxValue":230.320,"c3_minValue":230.320,"c3_nullCount":0,"valueCount":1}
+{"c1_maxValue":633,"c1_minValue":624,"c1_nullCount":0,"c2_maxValue":" 
987sdc","c2_minValue":" 
986sdc","c2_nullCount":0,"c3_maxValue":580.317,"c3_minValue":375.308,"c3_nullCount":0,"valueCount":2}
+{"c1_maxValue":639,"c1_minValue":555,"c1_nullCount":0,"c2_maxValue":" 
989sdc","c2_minValue":" 
982sdc","c2_nullCount":0,"c3_maxValue":904.304,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":3}
+{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_maxValue":" 
76sdc","c2_minValue":" 
224sdc","c2_nullCount":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_nullCount":0,"valueCount":12}
+{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_maxValue":" 
768sdc","c2_minValue":" 
118sdc","c2_nullCount":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_nullCount":0,"valueCount":7}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 
769sdc","c2_minValue":" 
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_maxValue":" 
770sdc","c2_minValue":" 
129sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":6}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
932sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 
943sdc","c2_minValue":" 
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table-short-schema.json
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..bae2a86a04a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table-short-schema.json
@@ -0,0 +1,8 @@
+{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_maxValue":" 
8sdc","c2_minValue":" 
111sdc","c2_nullCount":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_nullCount":0,"valueCount":15}
+{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_maxValue":" 
76sdc","c2_minValue":" 
224sdc","c2_nullCount":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_nullCount":0,"valueCount":12}
+{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_maxValue":" 
768sdc","c2_minValue":" 
118sdc","c2_nullCount":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_nullCount":0,"valueCount":7}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 
769sdc","c2_minValue":" 
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_maxValue":" 
770sdc","c2_minValue":" 
129sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":6}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 
932sdc","c2_minValue":" 
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 
943sdc","c2_minValue":" 
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 
959sdc","c2_minValue":" 
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index b0e23ac1902..582054cb3e6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -33,10 +33,14 @@ import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
+
 import org.apache.spark.sql._
 import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
 import org.apache.hudi.testutils.{HoodieSparkClientTestBase, 
LogFileColStatsTestUtil}
+import org.apache.hudi.util.JavaScalaConverters.convertScalaListToJavaList
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
+
 import org.apache.spark.sql.functions.typedLit
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.DataFrame
@@ -49,6 +53,7 @@ import java.sql.{Date, Timestamp}
 import java.util
 import java.util.List
 import java.util.stream.Collectors
+
 import scala.collection.JavaConverters._
 import scala.util.Random
 
@@ -115,8 +120,8 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
       // Currently, routine manually validating the column stats (by actually 
reading every column of every file)
       // only supports parquet files. Therefore we skip such validation when 
delta-log files are present, and only
       // validate in following cases: (1) COW: all operations; (2) MOR: insert 
only.
-      val shouldValidateColumnStatsManually = params.testCase.tableType == 
HoodieTableType.COPY_ON_WRITE ||
-        
params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      val shouldValidateColumnStatsManually = (params.testCase.tableType == 
HoodieTableType.COPY_ON_WRITE ||
+        
params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)) && 
params.shouldValidateManually
 
       validateColumnStatsIndex(
         params.testCase, params.metadataOpts, 
params.expectedColStatsSourcePath, shouldValidateColumnStatsManually, 
params.latestCompletedCommit)
@@ -228,17 +233,16 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
 
     val columnStatsIndex = new ColumnStatsIndexSupport(spark, 
sourceTableSchema, metadataConfig, metaClient)
 
-    val indexedColumns: Set[String] = {
-      val customIndexedColumns = 
metadataConfig.getColumnsEnabledForColumnStatsIndex
-      if (customIndexedColumns.isEmpty) {
-        sourceTableSchema.fieldNames.toSet
-      } else {
-        customIndexedColumns.asScala.toSet
-      }
-    }
+    val indexedColumns: Set[String] = HoodieTableMetadataUtil
+      .getColumnsToIndex(metaClient.getTableConfig, metadataConfig, 
convertScalaListToJavaList(sourceTableSchema.fieldNames)).asScala.toSet
+
     val (expectedColStatsSchema, _) = 
composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, 
sourceTableSchema)
-    val validationSortColumns = Seq("c1_maxValue", "c1_minValue", 
"c2_maxValue", "c2_minValue", "c3_maxValue",
+    val validationSortColumns = if (indexedColumns.contains("c5")) {
+      Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue", 
"c3_maxValue",
       "c3_minValue", "c5_maxValue", "c5_minValue")
+    } else {
+      Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue", 
"c3_maxValue", "c3_minValue")
+    }
 
     columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, 
testCase.shouldReadInMemory) { transposedColStatsDF =>
       // Match against expected column stats table
@@ -363,6 +367,7 @@ object ColumnStatIndexTestBase {
                                    operation: String,
                                    saveMode: SaveMode,
                                    shouldValidate: Boolean = true,
+                                   shouldValidateManually: Boolean = true,
                                    latestCompletedCommit: String = null,
                                    numPartitions: Integer = 4,
                                    parquetMaxFileSize: Integer = 10 * 1024,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index a78e31a08d2..63d41b60a06 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -197,8 +197,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      true,
-      latestCompletedCommit,
+      latestCompletedCommit = latestCompletedCommit,
       numPartitions =  1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -215,8 +214,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = expectedColStatsSourcePath1,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      true,
-      latestCompletedCommit,
+      latestCompletedCommit = latestCompletedCommit,
       numPartitions =  1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -248,7 +246,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = null,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite,
-      false,
+      shouldValidate = false,
       numPartitions =  1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -259,7 +257,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = null,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      false,
+      shouldValidate = false,
       numPartitions =  1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -289,8 +287,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      true,
-      latestCompletedCommit,
+      latestCompletedCommit = latestCompletedCommit,
       numPartitions =  1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -381,7 +378,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = null,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      false,
+      shouldValidate = false,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -394,7 +391,6 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      true,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -427,7 +423,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = null,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite,
-      false,
+      shouldValidate = false,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -443,7 +439,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = null,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      false,
+      shouldValidate = false,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -460,7 +456,6 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      true,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -494,7 +489,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = null,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite,
-      false,
+      shouldValidate = false,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -510,7 +505,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = null,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      false,
+      shouldValidate = false,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
@@ -527,7 +522,6 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      true,
       numPartitions = 1,
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 78e04b2f417..59558796888 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -67,6 +67,29 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
     verifyFileIndexAndSQLQueries(commonOpts)
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testMetadataColumnStatsIndexParams"))
+  def testMetadataColumnStatsIndexWithSQLWithLimitedIndexes(testCase: 
ColumnStatsTestCase): Unit = {
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.key() -> "3"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+      DataSourceReadOptions.QUERY_TYPE.key -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+    ) ++ metadataOpts
+    setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true, 
useShortSchema = true)
+  }
+
   @ParameterizedTest
   @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR"))
   def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase: 
ColumnStatsTestCase): Unit = {
@@ -204,25 +227,33 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
     verifyFileIndexAndSQLQueries(commonOpts)
   }
 
-  private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: 
Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean): 
Unit = {
+  private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: 
Map[String, String], commonOpts: Map[String, String],
+                         shouldValidate: Boolean, useShortSchema: Boolean = 
false): Unit = {
+    val filePostfix = if (useShortSchema) {
+      "-short-schema"
+    } else {
+      ""
+    }
     doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/input-table-json",
-      expectedColStatsSourcePath = 
"index/colstats/column-stats-index-table.json",
+      expectedColStatsSourcePath = 
s"index/colstats/column-stats-index-table${filePostfix}.json",
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      shouldValidateManually = !useShortSchema,
       saveMode = SaveMode.Overwrite))
 
     doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/another-input-table-json",
-      expectedColStatsSourcePath = 
"index/colstats/updated-column-stats-index-table.json",
+      expectedColStatsSourcePath = 
s"index/colstats/updated-column-stats-index-table${filePostfix}.json",
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      shouldValidateManually = !useShortSchema,
       saveMode = SaveMode.Append))
 
     // NOTE: MOR and COW have different fixtures since MOR is bearing 
delta-log files (holding
     //       deferred updates), diverging from COW
     val expectedColStatsSourcePath = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
-      "index/colstats/cow-updated2-column-stats-index-table.json"
+      
s"index/colstats/cow-updated2-column-stats-index-table${filePostfix}.json"
     } else {
-      "index/colstats/mor-updated2-column-stats-index-table.json"
+      
s"index/colstats/mor-updated2-column-stats-index-table${filePostfix}.json"
     }
 
     doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
@@ -230,7 +261,8 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
-      shouldValidate))
+      shouldValidate,
+      shouldValidateManually = !useShortSchema))
   }
 
   def verifyFileIndexAndSQLQueries(opts: Map[String, String], 
isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean 
= true): Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 9e3c09edb76..1fc1b7e7777 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -893,7 +893,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
   def testReadPathsForOnlyLogFiles(recordType: HoodieRecordType): Unit = {
     val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
     // enable column stats
-    val hudiOpts = writeOpts ++ 
Map(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true")
+    val hudiOpts = writeOpts ++ 
Map(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key() -> 
"fare,city_to_state,rider")
 
     initMetaClient(HoodieTableType.MERGE_ON_READ)
     val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
index f6873c8bd3d..6e214e3fe35 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
@@ -616,7 +616,8 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
           val result4DF = spark.sql(
             s"select type, key, ColumnStatsMetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.COLUMN_STATS.getRecordType}"
           )
-          assert(result4DF.count() == 3)
+          // 3 meta columns are always indexed so 3 stats per column * (3 meta 
cols + 1 data col) = 12
+          assert(result4DF.count() == 12)
 
           val result5DF = spark.sql(
             s"select type, key, recordIndexMetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.RECORD_INDEX.getRecordType}"
@@ -661,7 +662,8 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
                |  preCombineField = 'ts',
                |  hoodie.datasource.write.recordkey.field = 'id',
                |  hoodie.metadata.index.partition.stats.enable = 'true',
-               |  hoodie.metadata.index.column.stats.column.list = 'price'
+               |  hoodie.metadata.index.column.stats.column.list = 'price',
+               |  hoodie.populate.meta.fields = 'false'
                |)
                |location '${tmp.getCanonicalPath}/$tableName'
                |""".stripMargin
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 63079544886..8a8bf31a8ee 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -135,6 +135,7 @@ import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.META_COL_SET_TO_INDEX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLocationFromRecordIndexInfo;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLogFileColumnRangeMetadata;
 
@@ -1823,7 +1824,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     private List<String> getAllColumnNames() {
       try {
-        return schema.getFields().stream()
+        return schema.getFields().stream().filter(field -> 
META_COL_SET_TO_INDEX.contains(field.name()))
             .map(Schema.Field::name).collect(Collectors.toList());
       } catch (Exception e) {
         throw new HoodieException("Failed to get all column names for " + 
metaClient.getBasePath());

Reply via email to