nsivabalan commented on code in PR #17796:
URL: https://github.com/apache/hudi/pull/17796#discussion_r2670548296


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMDTStats.java:
##########
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.MetadataWriterTestUtils;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.stats.ValueMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.execution.datasources.NoopCache$;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+public class HoodieMDTStats implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMDTStats.class);
+
+  private static final int INITIAL_ROW_COUNT = 50; // Rows to insert in STEP 1
+
+  private final Config cfg;
+  // Properties with source, hoodie client, key generator etc.
+  private TypedProperties props;
+
+  private final SparkSession spark;
+
+  private final JavaSparkContext jsc;
+
+  private final HoodieEngineContext engineContext;
+
+  private static final String AVRO_SCHEMA =
+      "{\n"
+          + "  \"type\": \"record\",\n"
+          + "  \"name\": \"Employee\",\n"
+          + "  \"namespace\": \"com.example.avro\",\n"
+          + "  \"fields\": [\n"
+          + "    { \"name\": \"id\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"name\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"city\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"age\", \"type\": \"int\" },\n"
+          + "    { \"name\": \"salary\", \"type\": \"double\" },\n"
+          + "    { \"name\": \"dt\", \"type\": \"string\" }\n"
+          + "  ]\n"
+          + "}\n";
+
+  public HoodieMDTStats(SparkSession spark, Config cfg) {
+    this.spark = spark;
+    this.jsc = new JavaSparkContext(spark.sparkContext());
+    this.engineContext = new HoodieSparkEngineContext(jsc);
+    this.cfg = cfg;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--table-base-path", "-tbp"}, description = "Number of 
columns to index", required = true)
+    public String tableBasePath = null;
+
+    @Parameter(names = {"--cols-to-index", "-num-cols"}, description = "Number 
of columns to index", required = true)
+    public String colsToIndex = "age,salary";
+
+    @Parameter(names = {"--col-stats-file-group-count", "-col-fg-count"}, 
description = "Target Base path for the table", required = true)
+    public Integer colStatsFileGroupCount = 10;
+
+    @Parameter(names = {"--num-files", "-nf"}, description = "Target Base path 
for the table", required = true)
+    public Integer numFiles = 1000;
+
+    @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base 
path for the table", required = true)
+    public Integer numPartitions = 1;
+
+    @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number 
of files to create per commit. If not specified or >= num-files, all files will 
be in one commit", required = false)

Review Comment:
   lets try to get this working:
   
   initial setup:
   just create data table w/o any metadata table. 
   
   from tests: 
      generate data 
      ingest into mdt which will also initialize. that way, we get hfiles in 
mdt directly. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -797,64 +795,8 @@ void reconcileAgainstMarkers(HoodieEngineContext context,
                                boolean consistencyCheckEnabled,
                                boolean shouldFailOnDuplicateDataFileDetection,
                                WriteMarkers markers) throws HoodieIOException {
-    try {
-      // Reconcile marker and data files with WriteStats so that partially 
written data-files due to failed
-      // (but succeeded on retry) tasks are removed.
-      String basePath = getMetaClient().getBasePath().toString();
-
-      if (!markers.doesMarkerDirExist()) {
-        // can happen if it was an empty write say.
-        return;
-      }
-
-      // Ignores log file appended for update, since they are already 
fail-safe.
-      // but new created log files should be included.
-      Set<String> invalidDataPaths = getInvalidDataPaths(markers);
-      Set<String> validDataPaths = stats.stream()
-          .map(HoodieWriteStat::getPath)
-          .collect(Collectors.toSet());
-      Set<String> validCdcDataPaths = stats.stream()
-          .map(HoodieWriteStat::getCdcStats)
-          .filter(Objects::nonNull)
-          .flatMap(cdcStat -> cdcStat.keySet().stream())
-          .collect(Collectors.toSet());
-
-      // Contains list of partially created files. These needs to be cleaned 
up.
-      invalidDataPaths.removeAll(validDataPaths);
-      invalidDataPaths.removeAll(validCdcDataPaths);
-
-      if (!invalidDataPaths.isEmpty()) {
-        if (shouldFailOnDuplicateDataFileDetection) {
-          throw new HoodieDuplicateDataFileDetectedException("Duplicate data 
files detected " + invalidDataPaths);
-        }
-
-        log.info("Removing duplicate files created due to task retries before 
committing. Paths=" + invalidDataPaths);
-        Map<String, List<Pair<String, String>>> invalidPathsByPartition = 
invalidDataPaths.stream()
-            .map(dp ->
-                Pair.of(new StoragePath(basePath, dp).getParent().toString(),
-                    new StoragePath(basePath, dp).toString()))
-            .collect(Collectors.groupingBy(Pair::getKey));
-
-        // Ensure all files in delete list is actually present. This is 
mandatory for an eventually consistent FS.
-        // Otherwise, we may miss deleting such files. If files are not found 
even after retries, fail the commit
-        if (consistencyCheckEnabled) {
-          // This will either ensure all files to be deleted are present.
-          waitForAllFiles(context, invalidPathsByPartition, 
FileVisibility.APPEAR);
-        }
-
-        // Now delete partially written files
-        context.setJobStatus(this.getClass().getSimpleName(), "Delete all 
partially written files: " + config.getTableName());
-        deleteInvalidFilesByPartitions(context, invalidPathsByPartition);
-
-        // Now ensure the deleted files disappear
-        if (consistencyCheckEnabled) {
-          // This will either ensure all files to be deleted are absent.
-          waitForAllFiles(context, invalidPathsByPartition, 
FileVisibility.DISAPPEAR);
-        }
-      }
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
-    }
+    log.warn("Skipping reconcile markers for instant: {}", instantTs);

Review Comment:
   this is P1. and on me (siva)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMDTStats.java:
##########
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.MetadataWriterTestUtils;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.stats.ValueMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.execution.datasources.NoopCache$;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+public class HoodieMDTStats implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMDTStats.class);
+
+  private static final int INITIAL_ROW_COUNT = 50; // Rows to insert in STEP 1
+
+  private final Config cfg;
+  // Properties with source, hoodie client, key generator etc.
+  private TypedProperties props;
+
+  private final SparkSession spark;
+
+  private final JavaSparkContext jsc;
+
+  private final HoodieEngineContext engineContext;
+
+  private static final String AVRO_SCHEMA =
+      "{\n"
+          + "  \"type\": \"record\",\n"
+          + "  \"name\": \"Employee\",\n"
+          + "  \"namespace\": \"com.example.avro\",\n"
+          + "  \"fields\": [\n"
+          + "    { \"name\": \"id\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"name\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"city\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"age\", \"type\": \"int\" },\n"
+          + "    { \"name\": \"salary\", \"type\": \"double\" },\n"
+          + "    { \"name\": \"dt\", \"type\": \"string\" }\n"
+          + "  ]\n"
+          + "}\n";
+
+  public HoodieMDTStats(SparkSession spark, Config cfg) {
+    this.spark = spark;
+    this.jsc = new JavaSparkContext(spark.sparkContext());
+    this.engineContext = new HoodieSparkEngineContext(jsc);
+    this.cfg = cfg;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--table-base-path", "-tbp"}, description = "Number of 
columns to index", required = true)
+    public String tableBasePath = null;
+
+    @Parameter(names = {"--cols-to-index", "-num-cols"}, description = "Number 
of columns to index", required = true)
+    public String colsToIndex = "age,salary";
+
+    @Parameter(names = {"--col-stats-file-group-count", "-col-fg-count"}, 
description = "Target Base path for the table", required = true)
+    public Integer colStatsFileGroupCount = 10;
+
+    @Parameter(names = {"--num-files", "-nf"}, description = "Target Base path 
for the table", required = true)
+    public Integer numFiles = 1000;
+
+    @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base 
path for the table", required = true)
+    public Integer numPartitions = 1;
+
+    @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number 
of files to create per commit. If not specified or >= num-files, all files will 
be in one commit", required = false)
+    public Integer filesPerCommit = 1000;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command 
line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+
+    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
+        + "hoodie client for clustering")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Override
+    public String toString() {
+      return "TableSizeStats {\n"
+          + "   --col-to-index " + colsToIndex + ", \n"
+          + "   --col-stats-file-group-count " + colStatsFileGroupCount + ", 
\n"
+          + "\n}";
+    }
+  }
+
+  public static void main(String[] args) {
+    final Config cfg = new Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final LocalDateTime now = LocalDateTime.now();
+    final String currentHour = 
now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
+    String jobName = "metadata-table-stats-analyzer";
+    String sparkAppName = jobName + "-" + currentHour;
+    SparkSession spark = SparkSession.builder()
+        .appName(sparkAppName)
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+
+
+    try (HoodieMDTStats hoodieMDTStats = new HoodieMDTStats(spark, cfg)) {
+      hoodieMDTStats.run();
+    } catch (Throwable throwable) {
+      LOG.error("Failed to get table size stats for " + cfg, throwable);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  public void run() throws Exception {
+    int numFiles = cfg.numFiles;
+    int numPartitions = cfg.numPartitions;
+
+    LOG.info("Starting MDT stats test with {} files, {} partitions, {} 
columns, {} file groups",
+        numFiles, numPartitions, cfg.colsToIndex, cfg.colStatsFileGroupCount);
+    LOG.info("Data table base path: {}", cfg.tableBasePath);
+    String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(cfg.tableBasePath);
+    LOG.info("Metadata table base path: {}", metadataTableBasePath);
+
+    String tableName = "test_mdt_stats_tbl";
+    initializeTableWithSampleData(tableName);
+    // Create data table config with metadata enabled
+    HoodieWriteConfig dataWriteConfig = getWriteConfig(tableName, AVRO_SCHEMA, 
cfg.tableBasePath, HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieMetadataConfig metadataConfig = dataWriteConfig.getMetadataConfig();
+    HoodieWriteConfig dataConfig = HoodieWriteConfig.newBuilder()
+        .withProperties(dataWriteConfig.getProps())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .fromProperties(metadataConfig.getProps())
+            .enable(true)
+            .withMetadataIndexColumnStats(true)
+            
.withMetadataIndexColumnStatsFileGroupCount(cfg.colStatsFileGroupCount)
+            .build())
+        .build();
+
+    HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+        .setBasePath(dataConfig.getBasePath())
+        .setConf(engineContext.getStorageConf().newInstance())
+        .build();
+
+    // STEP 1: Insert 50 rows with age and salary columns to initialize table 
schema
+    // and metadata table
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+    Integer filesPerCommit = cfg.filesPerCommit;
+
+    int effectiveFilesPerCommit = filesPerCommit >= numFiles ? numFiles
+        : filesPerCommit;
+
+    // Calculate number of commits needed
+    int numCommits = (int) Math.ceil((double) numFiles / 
effectiveFilesPerCommit);
+
+    LOG.info("Creating {} commits with {} files per commit", numCommits, 
effectiveFilesPerCommit);
+
+    List<String> partitions = generatePartitions(cfg.numPartitions);
+    HoodieTestTable testTable = HoodieTestTable.of(dataMetaClient);
+
+    HoodieWriteConfig mdtConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(
+        dataConfig,
+        HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.NINE);
+
+    // Track all expected stats across all commits for final verification
+    @SuppressWarnings("rawtypes")
+    Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
allExpectedStats = new HashMap<>();
+
+    int remainingFiles = numFiles;
+    int totalFilesCreated = 0;
+
+    // STEP 2 & 3: Create multiple commits with files and metadata
+    for (int commitIdx = 0; commitIdx < numCommits; commitIdx++) {
+      LOG.info("=== Processing commit {}/{} ===", commitIdx + 1, numCommits);
+
+      // Calculate files for this commit
+      int filesInThisCommit = Math.min(effectiveFilesPerCommit, 
remainingFiles);
+      int filesPerPartitionThisCommit = filesInThisCommit / numPartitions;
+
+      LOG.info("Creating {} files in this commit ({} per partition)",
+          filesInThisCommit, filesPerPartitionThisCommit);
+
+      // Generate unique commit time
+      String dataCommitTime = InProcessTimeGenerator.createNewInstantTime();
+
+      // Create commit metadata
+      HoodieCommitMetadata commitMetadata = testTable.createCommitMetadata(
+          dataCommitTime,
+          WriteOperationType.INSERT,
+          partitions,
+          filesPerPartitionThisCommit,
+          false); // bootstrap
+
+      // Add commit to timeline
+      testTable.addCommit(dataCommitTime, Option.of(commitMetadata));
+      LOG.info("Created commit metadata at instant {}", dataCommitTime);
+
+      // Create actual empty parquet files on disk
+      createEmptyParquetFilesDistributed(dataMetaClient, commitMetadata);
+      totalFilesCreated += filesInThisCommit;
+      LOG.info("Created {} empty parquet files on disk (total so far: {})",
+          filesInThisCommit, totalFilesCreated);
+
+      dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+      // Write both /files and /column_stats partitions to metadata table
+      @SuppressWarnings("rawtypes")
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
commitExpectedStats =
+          writeFilesAndColumnStatsToMetadataTable(dataConfig, dataMetaClient, 
commitMetadata, dataCommitTime, mdtConfig);
+
+      // Accumulate expected stats from this commit
+      allExpectedStats.putAll(commitExpectedStats);
+
+      remainingFiles -= filesInThisCommit;
+      LOG.info("Commit {}/{} completed. Remaining files: {}", commitIdx + 1, 
numCommits, remainingFiles);
+    }
+
+    LOG.info("=== All {} commits completed. Total files created: {} ===", 
numCommits, totalFilesCreated);
+
+    // STEP 4: Verification
+    LOG.info("Total unique files with stats: {}", allExpectedStats.size());
+
+    // STEP 5: Use HoodieFileIndex.filterFileSlices to query and verify
+    queryAndVerifyColumnStats(dataConfig, dataMetaClient, allExpectedStats, 
totalFilesCreated);
+  }
+
+  /**
+   * Generates a list of date-based partition paths incrementing by day.
+   * Starting from 2020-01-01, creates partitions for consecutive days based 
on numPartitions.
+   * <p>
+   * Example:
+   * numPartitions = 1  -> ["2020-01-01"]
+   * numPartitions = 3  -> ["2020-01-01", "2020-01-02", "2020-01-03"]
+   * numPartitions = 10 -> ["2020-01-01", "2020-01-02", ..., "2020-01-10"]
+   *
+   * @param numPartitions Number of partitions to generate
+   * @return List of partition paths in yyyy-MM-dd format
+   */
+  private List<String> generatePartitions(int numPartitions) {
+    if (numPartitions <= 0) {
+      throw new IllegalArgumentException("numPartitions must be greater than 
0, got: " + numPartitions);
+    }
+
+    List<String> partitions = new ArrayList<>();
+    LocalDate startDate = LocalDate.of(2020, 1, 1);
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+    for (int i = 0; i < numPartitions; i++) {
+      LocalDate partitionDate = startDate.plusDays(i);
+      String partitionPath = partitionDate.format(formatter);
+      partitions.add(partitionPath);
+    }
+
+    LOG.info("Generated {} partitions from {} to {}",
+        numPartitions,
+        partitions.get(0),
+        partitions.get(partitions.size() - 1));
+
+    return partitions;
+  }
+
+  /**
+   * Print column stats for verification - shows min/max values for up to 10 
files per partition.
+   * This helps verify that column stats were constructed properly before 
querying.
+   *
+   * @param commitMetadata The commit metadata containing partition and file 
information
+   * @param expectedStats  The expected column stats map (file name -> column 
name -> stats)
+   */
+  @SuppressWarnings("rawtypes")
+  private void printColumnStatsForVerification(
+      HoodieCommitMetadata commitMetadata,
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats) {
+
+    LOG.info("=== STEP 4: Verifying column stats construction (max 10 files 
per partition) ===");
+
+    Map<String, List<HoodieWriteStat>> partitionToWriteStats = 
commitMetadata.getPartitionToWriteStats();
+
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      String partitionPath = entry.getKey();
+      List<HoodieWriteStat> writeStats = entry.getValue();
+
+      LOG.info("Partition: {} ({} files total)", partitionPath, 
writeStats.size());
+      LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s",
+          "FileName", "age_min", "age_max", "salary_min", "salary_max"));
+      LOG.info(String.join("", Collections.nCopies(110, "-")));
+
+      int filesDisplayed = 0;
+      for (HoodieWriteStat writeStat : writeStats) {
+        if (filesDisplayed >= 10) {
+          LOG.info("... and {} more files", writeStats.size() - 10);
+          break;
+        }
+
+        String filePath = writeStat.getPath();
+        String fileName = new StoragePath(filePath).getName();
+
+        Map<String, HoodieColumnRangeMetadata<Comparable>> fileStats = 
expectedStats.get(fileName);
+        if (fileStats != null) {
+          HoodieColumnRangeMetadata<Comparable> ageStats = 
fileStats.get("age");
+          HoodieColumnRangeMetadata<Comparable> salaryStats = 
fileStats.get("salary");
+
+          String ageMin = (ageStats != null) ? 
String.valueOf(ageStats.getMinValue()) : "N/A";
+          String ageMax = (ageStats != null) ? 
String.valueOf(ageStats.getMaxValue()) : "N/A";
+          String salaryMin = (salaryStats != null) ? 
String.valueOf(salaryStats.getMinValue()) : "N/A";
+          String salaryMax = (salaryStats != null) ? 
String.valueOf(salaryStats.getMaxValue()) : "N/A";
+
+          LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s",
+              fileName.length() > 48 ? fileName.substring(0, 48) + ".." : 
fileName,
+              ageMin, ageMax, salaryMin, salaryMax));
+        } else {
+          LOG.info(String.format("%-50s %-15s", fileName, "NO STATS FOUND"));
+        }
+
+        filesDisplayed++;
+      }
+    }
+
+    LOG.info("");
+    LOG.info("Total files with stats: {}", expectedStats.size());
+  }
+
+  /**
+   * Query the column stats index using HoodieFileIndex.filterFileSlices and 
verify results.
+   *
+   * @param dataConfig     The write config for the data table
+   * @param dataMetaClient The meta client for the data table
+   * @param expectedStats  The expected column stats for verification
+   * @param numFiles       The total number of files in the commit
+   */
+  @SuppressWarnings("rawtypes")
+  private void queryAndVerifyColumnStats(
+      HoodieWriteConfig dataConfig,
+      HoodieTableMetaClient dataMetaClient,
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats,
+      int numFiles) throws Exception {
+
+    LOG.info("=== STEP 5: Querying column stats index using HoodieFileIndex 
===");
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+    // Create HoodieFileIndex
+    Map<String, String> options = new HashMap<>();
+    options.put("path", dataConfig.getBasePath());
+    options.put("hoodie.datasource.read.data.skipping.enable", "true");
+    options.put("hoodie.metadata.enable", "true");
+    options.put("hoodie.metadata.index.column.stats.enable", "true");
+    // Also ensure the columns are specified for column stats
+    options.put("hoodie.metadata.index.column.stats.column.list", 
"age,salary");
+    
spark.sqlContext().conf().setConfString("hoodie.fileIndex.dataSkippingFailureMode",
 "strict");
+
+    // Create schema with the columns used for data skipping
+    StructType dataSchema = new StructType()
+        .add("id", "string")
+        .add("name", "string")
+        .add("city", "string")
+        .add("age", "int")
+        .add("salary", "long");
+    scala.Option<StructType> schemaOption = scala.Option.apply(dataSchema);
+
+    @SuppressWarnings("deprecation")
+    scala.collection.immutable.Map<String, String> scalaOptions = 
JavaConverters.mapAsScalaMap(options)
+        .toMap(scala.Predef$.MODULE$.<scala.Tuple2<String, String>>conforms());
+
+    org.apache.hudi.HoodieFileIndex fileIndex = new 
org.apache.hudi.HoodieFileIndex(
+        spark,
+        dataMetaClient,
+        schemaOption,
+        scalaOptions,
+        NoopCache$.MODULE$,
+        false,
+        false);
+
+    // Create data filters for age and salary columns
+    // Unresolved expressions cause translateIntoColumnStatsIndexFilterExpr to 
return TrueLiteral (no filtering).
+    List<Expression> dataFilters = new ArrayList<>();
+    String filterString = "age > 90";
+    Expression filter1 = 
org.apache.spark.sql.HoodieCatalystExpressionUtils$.MODULE$
+        .resolveExpr(spark, filterString, dataSchema);
+    LOG.info("DEBUG: Resolved filter expression: {}", filter1);
+    LOG.info("DEBUG: Resolved filter resolved: {}", filter1.resolved());
+    LOG.info("DEBUG: Resolved filter tree:\n{}", filter1.treeString());
+
+    dataFilters.add(filter1);
+    // Expression filter2 = 
org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr(
+    //     sparkSession, "salary > 100000", dataSchema);
+    // dataFilters.add(filter2);
+
+    List<Expression> partitionFilters = new ArrayList<>(); // Empty partition 
filters
+
+    // Convert to Scala Seq
+    scala.collection.immutable.List<Expression> dataFiltersList = 
JavaConverters.asScalaBuffer(dataFilters)
+        .toList();
+    scala.collection.Seq<Expression> dataFiltersSeq = dataFiltersList;
+    scala.collection.immutable.List<Expression> partitionFiltersList = 
JavaConverters
+        .asScalaBuffer(partitionFilters).toList();
+    scala.collection.Seq<Expression> partitionFiltersSeq = 
partitionFiltersList;
+
+    // Call filterFileSlices
+    
scala.collection.Seq<scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>,
+        scala.collection.Seq<FileSlice>>> filteredSlices = fileIndex
+        .filterFileSlices(
+            dataFiltersSeq,
+            partitionFiltersSeq,
+            false);
+
+    // Print results
+    LOG.info("");
+    LOG.info("Filtered File Slices Min/Max Values:");
+    LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s",
+        "FileName", "age_min", "age_max", "salary_min", "salary_max"));
+    LOG.info(String.join("", Collections.nCopies(100, "-")));
+
+    int totalFileSlices = 0;
+    for (int j = 0; j < filteredSlices.size(); j++) {
+      
scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>,
+          scala.collection.Seq<FileSlice>> tuple = filteredSlices.apply(j);
+      scala.collection.Seq<FileSlice> fileSliceSeq = tuple._2();
+      totalFileSlices += fileSliceSeq.size();
+
+      for (int k = 0; k < fileSliceSeq.size(); k++) {
+        FileSlice fileSlice = fileSliceSeq.apply(k);
+        String fileName = fileSlice.getBaseFile().get().getFileName();
+
+        Map<String, HoodieColumnRangeMetadata<Comparable>> fileExpectedStats = 
expectedStats.get(fileName);
+        if (fileExpectedStats != null) {
+          HoodieColumnRangeMetadata<Comparable> ageStats = 
fileExpectedStats.get("age");
+          HoodieColumnRangeMetadata<Comparable> salaryStats = 
fileExpectedStats.get("salary");
+
+          Object ageMin = (ageStats != null) ? ageStats.getMinValue() : "null";
+          Object ageMax = (ageStats != null) ? ageStats.getMaxValue() : "null";
+          Object salaryMin = (salaryStats != null) ? salaryStats.getMinValue() 
: "null";
+          Object salaryMax = (salaryStats != null) ? salaryStats.getMaxValue() 
: "null";
+
+          LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s",
+              fileName, ageMin.toString(), ageMax.toString(), 
salaryMin.toString(),
+              salaryMax.toString()));
+        }
+      }
+    }
+
+    LOG.info(String.join("", Collections.nCopies(100, "-")));
+    LOG.info("Total file slices returned: {}", totalFileSlices);
+    LOG.info("Total files in commit: {}", numFiles);
+
+    if (numFiles > 0) {
+      double skippingRatio = ((double) (numFiles - totalFileSlices) / 
numFiles) * 100.0;
+      LOG.info(String.format("Data skipping ratio: %.2f%%", skippingRatio));
+    }
+  }
+
+  /**
+   * Write both /files and /column_stats partitions to metadata table in a 
single commit.
+   * This method handles initialization of partitions if needed, tags records 
with location,
+   * and writes them together to simulate how actual code writes metadata.
+   *
+   * @param dataConfig     The write config for the data table
+   * @param dataMetaClient The meta client for the data table
+   * @param commitMetadata The commit metadata containing file information
+   * @param dataCommitTime The commit time for the data table commit
+   * @param mdtWriteConfig The write config for the metadata table
+   * @return Map of file names to their column stats metadata for verification
+   * @throws Exception if there's an error writing to the metadata table
+   */
+  @SuppressWarnings("rawtypes")
+  private Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
writeFilesAndColumnStatsToMetadataTable(
+      HoodieWriteConfig dataConfig,
+      HoodieTableMetaClient dataMetaClient,
+      HoodieCommitMetadata commitMetadata,
+      String dataCommitTime,
+      HoodieWriteConfig mdtWriteConfig) throws Exception {
+
+    try (HoodieTableMetadataWriter<?, ?> metadataWriter = 
SparkMetadataWriterFactory.create(
+        engineContext.getStorageConf(),
+        dataConfig,
+        engineContext,
+        Option.empty(),
+        dataMetaClient.getTableConfig())) {
+
+      // STEP 3a: Check if /files partition exists and initialize if needed
+      String metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(cfg.tableBasePath);
+      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
+          .setBasePath(metadataBasePath)
+          .setConf(engineContext.getStorageConf().newInstance())
+          .build();
+
+      boolean filesPartitionExists = dataMetaClient.getTableConfig()
+          .isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+      LOG.info("BEFORE initialization - Metadata table exists: {}, partitions: 
{}",
+          filesPartitionExists,
+          metadataMetaClient.getTableConfig().getMetadataPartitions());
+
+      if (!filesPartitionExists) {
+        // Mark partition as inflight in table config - this is required for 
tagRecordsWithLocation
+        // to work with isInitializing=true
+        dataMetaClient.getTableConfig().setMetadataPartitionsInflight(
+            dataMetaClient, MetadataPartitionType.FILES);
+        LOG.info("Marked /files partition as inflight for initialization");
+      }
+
+      // Also mark column_stats partition as inflight for initialization
+      boolean colStatsPartitionExists = dataMetaClient.getTableConfig()
+          .isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS);
+      if (!colStatsPartitionExists) {
+        dataMetaClient.getTableConfig().setMetadataPartitionsInflight(
+            dataMetaClient, MetadataPartitionType.COLUMN_STATS);
+        LOG.info("Marked /column_stats partition as inflight for 
initialization");
+
+        // Create index definition for column stats - this tells data skipping 
which columns are indexed
+        org.apache.hudi.common.model.HoodieIndexDefinition colStatsIndexDef =
+            new org.apache.hudi.common.model.HoodieIndexDefinition.Builder()
+                
.withIndexName(MetadataPartitionType.COLUMN_STATS.getPartitionPath())
+                .withIndexType("column_stats")
+                .withSourceFields(java.util.Arrays.asList("age", "salary"))
+                .build();
+        dataMetaClient.buildIndexDefinition(colStatsIndexDef);
+        LOG.info("Created column stats index definition for columns: age, 
salary");
+      }
+
+      // Convert commit metadata to files partition records
+      @SuppressWarnings("unchecked")
+      List<HoodieRecord<HoodieMetadataPayload>> filesRecords = 
(List<HoodieRecord<HoodieMetadataPayload>>) (List<?>)
+          
HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(commitMetadata, 
dataCommitTime);
+
+      // Generate column stats records
+      @SuppressWarnings("rawtypes")
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats = new HashMap<>();
+      List<HoodieRecord<HoodieMetadataPayload>> columnStatsRecords = 
generateColumnStatsRecordsForCommitMetadata(
+          commitMetadata,
+          expectedStats,
+          dataCommitTime);
+
+      LOG.info("Generated {} files records and {} column stats records", 
filesRecords.size(), columnStatsRecords.size());
+
+      // STEP 3b: Tag records with location for both partitions and write them 
together
+      // IMPORTANT: Use dataCommitTime for the metadata table commit to ensure 
timeline sync.
+      // HoodieFileIndex expects metadata table commits to match data table 
commit times.
+      String mdtCommitTime = dataCommitTime;
+      try (SparkRDDWriteClient<HoodieMetadataPayload> mdtWriteClient = new 
SparkRDDWriteClient<>(engineContext, mdtWriteConfig)) {
+
+        WriteClientTestUtils.startCommitWithTime(mdtWriteClient, 
mdtCommitTime);
+        JavaRDD<HoodieRecord<HoodieMetadataPayload>> filesRDD = 
jsc.parallelize(filesRecords, 1);
+        JavaRDD<HoodieRecord<HoodieMetadataPayload>> colStatsRDD = 
jsc.parallelize(columnStatsRecords, 1);
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter<JavaRDD<HoodieRecord>, 
JavaRDD<WriteStatus>>
+            sparkMetadataWriter
+            = (org.apache.hudi.metadata.HoodieBackedTableMetadataWriter) 
metadataWriter;
+
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        Map<String, org.apache.hudi.common.data.HoodieData<HoodieRecord>> 
partitionRecordsMap = new HashMap<>();
+        partitionRecordsMap.put(
+            HoodieTableMetadataUtil.PARTITION_NAME_FILES,
+            (org.apache.hudi.common.data.HoodieData<HoodieRecord>) 
(org.apache.hudi.common.data.HoodieData) HoodieJavaRDD
+                .of(filesRDD));
+        partitionRecordsMap.put(
+            HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS,
+            (org.apache.hudi.common.data.HoodieData<HoodieRecord>) 
(org.apache.hudi.common.data.HoodieData) HoodieJavaRDD
+                .of(colStatsRDD));
+
+        // Tag records for all partitions together - use isInitializing=true 
if /files partition was just marked as inflight
+        @SuppressWarnings("rawtypes")
+        Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> taggedResult =
+            MetadataWriterTestUtils.tagRecordsWithLocation(
+                sparkMetadataWriter,
+                partitionRecordsMap,
+                !filesPartitionExists // isInitializing = true if we just 
marked /files as inflight
+            );
+
+        // Check metadata table state after tagging
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+        LOG.info("AFTER tagging - Metadata table exists: {}, partitions: {}",
+            
metadataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES),
+            metadataMetaClient.getTableConfig().getMetadataPartitions());
+
+        // Convert back to JavaRDD - taggedResult contains all records from 
all partitions
+        @SuppressWarnings("unchecked")
+        JavaRDD<HoodieRecord<HoodieMetadataPayload>> allTaggedRecords =
+            (JavaRDD<HoodieRecord<HoodieMetadataPayload>>) (JavaRDD) 
HoodieJavaRDD
+                .getJavaRDD(taggedResult.getKey());
+
+        allTaggedRecords.take(3).forEach(r ->
+            LOG.info("DEBUG: Record key={}, location={}", r.getRecordKey(), 
r.getCurrentLocation()));
+
+        JavaRDD<WriteStatus> writeStatuses = 
mdtWriteClient.upsertPreppedRecords(allTaggedRecords, mdtCommitTime);
+        List<WriteStatus> statusList = writeStatuses.collect();
+        mdtWriteClient.commit(mdtCommitTime, jsc.parallelize(statusList, 1), 
Option.empty(), HoodieTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
+
+        // Mark partition as completed if we initialized it
+        if (!filesPartitionExists) {
+          dataMetaClient.getTableConfig().setMetadataPartitionState(
+              dataMetaClient, MetadataPartitionType.FILES.getPartitionPath(), 
true);
+          LOG.info("Marked /files partition as completed");
+        }
+
+        if (!colStatsPartitionExists) {
+          dataMetaClient.getTableConfig().setMetadataPartitionState(
+              dataMetaClient, 
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true);
+          LOG.info("Marked /column_stats partition as completed");
+        }
+
+        // Verify final state
+        dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+        LOG.info("AFTER commit - Metadata table exists: {}, partitions: {}",
+            
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES),
+            dataMetaClient.getTableConfig().getMetadataPartitions());
+
+        LOG.info("Wrote {} files partition records and {} column stats records 
to metadata table in a single commit",
+            filesRecords.size(), columnStatsRecords.size());
+      }
+      return expectedStats;
+    }
+  }
+
+  /**
+   * Initialize Hudi table with sample data to set up the table schema and
+   * metadata table.
+   *
+   * @return the name of the created table
+   */
+  private String initializeTableWithSampleData(String tableName) {
+    // Define a schema with 'id', 'age', 'salary' columns
+    StructType schema = new StructType()
+        .add("id", "string")
+        .add("name", "string")
+        .add("city", "string")
+        .add("age", "int")
+        .add("salary", "double")
+        .add("dt", "string");
+
+    // Generate 50 rows of sample data
+    List<Row> rows = new ArrayList<>();
+    for (int i = 0; i < INITIAL_ROW_COUNT; i++) {
+      rows.add(org.apache.spark.sql.RowFactory.create(
+          UUID.randomUUID().toString(),
+          "user_" + i,
+          "frisco",
+          20 + (i % 30), // age: 20..49
+          50000.0 + (i * 1000), //
+          "2020-01-01"// salary varies
+      ));
+    }
+    Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+    // Write the data to the Hudi table using spark sql
+    df.write()
+        .format("hudi")
+        .option("hoodie.table.name", tableName)
+        .option("hoodie.datasource.write.recordkey.field", "id")
+        .option("hoodie.datasource.write.partitionpath.field", "dt")
+        .option("hoodie.datasource.write.table.name", tableName)
+        .option("hoodie.datasource.write.operation", "insert")
+        .option("hoodie.datasource.write.precombine.field", "id")
+        .option("hoodie.metadata.enabled", "true")
+        .option("path", cfg.tableBasePath)
+        .mode("overwrite")
+        .save();
+
+    // Refresh table metadata in Spark
+    spark.catalog().clearCache();
+    
spark.read().format("hudi").load(cfg.tableBasePath).createOrReplaceTempView(tableName);
+
+    // print total rows in table
+    long totalRows = 
spark.read().format("hudi").load(cfg.tableBasePath).count();
+    LOG.info("Total rows in table: {}", totalRows);
+    // print the table first few rows
+    Dataset<Row> tableDF = spark.read().format("hudi").load(cfg.tableBasePath);
+    tableDF.show(10, false);
+
+    return tableName;
+  }
+
+  /**
+   * Creates empty parquet files on disk for all files in the commit metadata.
+   * This ensures that filesystem listing will find these files even if 
metadata table
+   * lookup falls back to filesystem.
+   *
+   * @param metaClient     The meta client for the data table
+   * @param commitMetadata The commit metadata containing file information
+   */
+  private void createEmptyParquetFiles(HoodieTableMetaClient metaClient,
+                                       HoodieCommitMetadata commitMetadata) 
throws Exception {
+    org.apache.hudi.storage.HoodieStorage storage = metaClient.getStorage();
+    StoragePath basePath = metaClient.getBasePath();
+
+    for (Map.Entry<String, List<HoodieWriteStat>> entry :
+        commitMetadata.getPartitionToWriteStats().entrySet()) {
+      String partitionPath = entry.getKey();
+      StoragePath partitionDir = new StoragePath(basePath, partitionPath);
+      if (!storage.exists(partitionDir)) {
+        storage.createDirectory(partitionDir);
+      }
+      for (HoodieWriteStat stat : entry.getValue()) {
+        String relativePath = stat.getPath();
+        StoragePath filePath = new StoragePath(basePath, relativePath);
+        if (!storage.exists(filePath)) {
+          storage.create(filePath).close();
+        }
+      }
+    }
+  }
+
+  /**
+   * Creates empty parquet files on disk in parallel using engineContext.
+   * This method distributes file creation by partition, where each executor 
task
+   * handles all files within a single partition for better locality and 
efficiency.
+   *
+   * @param metaClient     The meta client for the data table
+   * @param commitMetadata The commit metadata containing file information
+   */
+  private void createEmptyParquetFilesDistributed(HoodieTableMetaClient 
metaClient,
+                                                  HoodieCommitMetadata 
commitMetadata) throws Exception {
+    StoragePath basePath = metaClient.getBasePath();
+    Map<String, List<HoodieWriteStat>> partitionToWriteStats = 
commitMetadata.getPartitionToWriteStats();
+
+    // Calculate total files for logging
+    int totalFiles = partitionToWriteStats.values().stream()
+        .mapToInt(List::size)
+        .sum();
+
+    LOG.info("Creating {} empty parquet files across {} partitions in parallel 
using engineContext",
+        totalFiles, partitionToWriteStats.size());
+
+    // Create a list of partition entries to parallelize over
+    List<Map.Entry<String, List<HoodieWriteStat>>> partitionEntries =
+        new ArrayList<>(partitionToWriteStats.entrySet());
+
+    // Parallelize by partition - each executor task handles all files in one 
partition
+    engineContext.map(partitionEntries, partitionEntry -> {
+      String partitionPath = partitionEntry.getKey();
+      List<HoodieWriteStat> writeStats = partitionEntry.getValue();
+      int filesCreated = 0;
+
+      try {
+        org.apache.hudi.storage.HoodieStorage storageInstance = 
metaClient.getStorage();
+
+        // Create partition directory if it doesn't exist
+        StoragePath partitionDir = new StoragePath(basePath, partitionPath);
+        if (!storageInstance.exists(partitionDir)) {
+          storageInstance.createDirectory(partitionDir);
+        }
+
+        // Create all files in this partition
+        for (HoodieWriteStat stat : writeStats) {
+          String relativePath = stat.getPath();
+          StoragePath filePath = new StoragePath(basePath, relativePath);
+          if (!storageInstance.exists(filePath)) {
+            storageInstance.create(filePath).close();
+            filesCreated++;
+          }
+        }
+
+        LOG.debug("Created {} files in partition: {}", filesCreated, 
partitionPath);
+        return Pair.of(partitionPath, filesCreated);
+      } catch (Exception e) {
+        LOG.error("Failed to create files in partition: {}", partitionPath, e);
+        throw new RuntimeException("Failed to create files in partition: " + 
partitionPath, e);
+      }
+    }, partitionEntries.size()); // Parallelism equals number of partitions
+
+    LOG.info("Successfully created {} empty parquet files across {} 
partitions",
+        totalFiles, partitionToWriteStats.size());
+  }
+
+  /**
+   * Generates column stats records based on commit metadata file structure.
+   * Ensures file names match those in the commit metadata.
+   */
+  @SuppressWarnings("rawtypes")
+  private List<HoodieRecord<HoodieMetadataPayload>> 
generateColumnStatsRecordsForCommitMetadata(
+      HoodieCommitMetadata commitMetadata,
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStatsMap,
+      String commitTime) {
+
+    Random random = new Random(42);
+    List<HoodieRecord<HoodieMetadataPayload>> allRecords = new ArrayList<>();
+
+    // Extract file information from commit metadata
+    Map<String, List<HoodieWriteStat>> partitionToWriteStats = 
commitMetadata.getPartitionToWriteStats();
+
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      String partitionPath = entry.getKey();
+      List<HoodieWriteStat> writeStats = entry.getValue();
+
+      LOG.info("Processing partition {} with {} write stats", partitionPath, 
writeStats.size());
+
+      for (HoodieWriteStat writeStat : writeStats) {
+        String fileId = writeStat.getFileId();
+        String filePath = writeStat.getPath();
+        String fileName = new StoragePath(filePath).getName();
+
+        if (allRecords.size() < 5) {
+          LOG.debug("Processing file: {} (fileId: {}, path: {})", fileName, 
fileId, filePath);
+        }
+
+        List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata = new 
ArrayList<>();
+
+        // Generate stats for age (int) and salary (long) columns
+        String[] columnNames = getColumnsToIndex();
+        int numColumns = columnNames.length;
+        for (int colIdx = 0; colIdx < numColumns; colIdx++) {
+          String colName = columnNames[colIdx];
+
+          Comparable minValue;
+          Comparable maxValue;
+
+          if (colIdx == 0) {

Review Comment:
   cardinality of tenantId is 25 to 30k. 
   so, lets rename salary -> tenantId. 
   generate random long within 30k values. 
   
   
   
   and from top level config, 
   lets accept 1 or 2 as numColumnsToIndex. 
   if 1 -> tenantId
   if 2 -> tenantId & (either of salary or age) 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMDTStats.java:
##########
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.MetadataWriterTestUtils;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.stats.ValueMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.execution.datasources.NoopCache$;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+public class HoodieMDTStats implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMDTStats.class);
+
+  private static final int INITIAL_ROW_COUNT = 50; // Rows to insert in STEP 1
+
+  private final Config cfg;
+  // Properties with source, hoodie client, key generator etc.
+  private TypedProperties props;
+
+  private final SparkSession spark;
+
+  private final JavaSparkContext jsc;
+
+  private final HoodieEngineContext engineContext;
+
+  private static final String AVRO_SCHEMA =
+      "{\n"
+          + "  \"type\": \"record\",\n"
+          + "  \"name\": \"Employee\",\n"
+          + "  \"namespace\": \"com.example.avro\",\n"
+          + "  \"fields\": [\n"
+          + "    { \"name\": \"id\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"name\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"city\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"age\", \"type\": \"int\" },\n"
+          + "    { \"name\": \"salary\", \"type\": \"double\" },\n"
+          + "    { \"name\": \"dt\", \"type\": \"string\" }\n"
+          + "  ]\n"
+          + "}\n";
+
+  public HoodieMDTStats(SparkSession spark, Config cfg) {
+    this.spark = spark;
+    this.jsc = new JavaSparkContext(spark.sparkContext());
+    this.engineContext = new HoodieSparkEngineContext(jsc);
+    this.cfg = cfg;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--table-base-path", "-tbp"}, description = "Number of 
columns to index", required = true)
+    public String tableBasePath = null;
+
+    @Parameter(names = {"--cols-to-index", "-num-cols"}, description = "Number 
of columns to index", required = true)
+    public String colsToIndex = "age,salary";
+
+    @Parameter(names = {"--col-stats-file-group-count", "-col-fg-count"}, 
description = "Target Base path for the table", required = true)
+    public Integer colStatsFileGroupCount = 10;
+
+    @Parameter(names = {"--num-files", "-nf"}, description = "Target Base path 
for the table", required = true)
+    public Integer numFiles = 1000;
+
+    @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base 
path for the table", required = true)
+    public Integer numPartitions = 1;
+
+    @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number 
of files to create per commit. If not specified or >= num-files, all files will 
be in one commit", required = false)
+    public Integer filesPerCommit = 1000;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command 
line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+
+    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
+        + "hoodie client for clustering")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Override
+    public String toString() {
+      return "TableSizeStats {\n"
+          + "   --col-to-index " + colsToIndex + ", \n"
+          + "   --col-stats-file-group-count " + colStatsFileGroupCount + ", 
\n"
+          + "\n}";
+    }
+  }
+
+  public static void main(String[] args) {
+    final Config cfg = new Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final LocalDateTime now = LocalDateTime.now();
+    final String currentHour = 
now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
+    String jobName = "metadata-table-stats-analyzer";
+    String sparkAppName = jobName + "-" + currentHour;
+    SparkSession spark = SparkSession.builder()
+        .appName(sparkAppName)
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+
+
+    try (HoodieMDTStats hoodieMDTStats = new HoodieMDTStats(spark, cfg)) {
+      hoodieMDTStats.run();
+    } catch (Throwable throwable) {
+      LOG.error("Failed to get table size stats for " + cfg, throwable);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  public void run() throws Exception {
+    int numFiles = cfg.numFiles;
+    int numPartitions = cfg.numPartitions;
+
+    LOG.info("Starting MDT stats test with {} files, {} partitions, {} 
columns, {} file groups",
+        numFiles, numPartitions, cfg.colsToIndex, cfg.colStatsFileGroupCount);
+    LOG.info("Data table base path: {}", cfg.tableBasePath);
+    String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(cfg.tableBasePath);
+    LOG.info("Metadata table base path: {}", metadataTableBasePath);
+
+    String tableName = "test_mdt_stats_tbl";
+    initializeTableWithSampleData(tableName);
+    // Create data table config with metadata enabled
+    HoodieWriteConfig dataWriteConfig = getWriteConfig(tableName, AVRO_SCHEMA, 
cfg.tableBasePath, HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieMetadataConfig metadataConfig = dataWriteConfig.getMetadataConfig();
+    HoodieWriteConfig dataConfig = HoodieWriteConfig.newBuilder()
+        .withProperties(dataWriteConfig.getProps())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .fromProperties(metadataConfig.getProps())
+            .enable(true)
+            .withMetadataIndexColumnStats(true)
+            
.withMetadataIndexColumnStatsFileGroupCount(cfg.colStatsFileGroupCount)
+            .build())
+        .build();
+
+    HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+        .setBasePath(dataConfig.getBasePath())
+        .setConf(engineContext.getStorageConf().newInstance())
+        .build();
+
+    // STEP 1: Insert 50 rows with age and salary columns to initialize table 
schema
+    // and metadata table
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+    Integer filesPerCommit = cfg.filesPerCommit;
+
+    int effectiveFilesPerCommit = filesPerCommit >= numFiles ? numFiles
+        : filesPerCommit;
+
+    // Calculate number of commits needed
+    int numCommits = (int) Math.ceil((double) numFiles / 
effectiveFilesPerCommit);
+
+    LOG.info("Creating {} commits with {} files per commit", numCommits, 
effectiveFilesPerCommit);
+
+    List<String> partitions = generatePartitions(cfg.numPartitions);
+    HoodieTestTable testTable = HoodieTestTable.of(dataMetaClient);
+
+    HoodieWriteConfig mdtConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(
+        dataConfig,
+        HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.NINE);
+
+    // Track all expected stats across all commits for final verification
+    @SuppressWarnings("rawtypes")
+    Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
allExpectedStats = new HashMap<>();
+
+    int remainingFiles = numFiles;
+    int totalFilesCreated = 0;
+
+    // STEP 2 & 3: Create multiple commits with files and metadata
+    for (int commitIdx = 0; commitIdx < numCommits; commitIdx++) {
+      LOG.info("=== Processing commit {}/{} ===", commitIdx + 1, numCommits);
+
+      // Calculate files for this commit
+      int filesInThisCommit = Math.min(effectiveFilesPerCommit, 
remainingFiles);
+      int filesPerPartitionThisCommit = filesInThisCommit / numPartitions;
+
+      LOG.info("Creating {} files in this commit ({} per partition)",
+          filesInThisCommit, filesPerPartitionThisCommit);
+
+      // Generate unique commit time
+      String dataCommitTime = InProcessTimeGenerator.createNewInstantTime();
+
+      // Create commit metadata
+      HoodieCommitMetadata commitMetadata = testTable.createCommitMetadata(
+          dataCommitTime,
+          WriteOperationType.INSERT,
+          partitions,
+          filesPerPartitionThisCommit,
+          false); // bootstrap
+
+      // Add commit to timeline
+      testTable.addCommit(dataCommitTime, Option.of(commitMetadata));
+      LOG.info("Created commit metadata at instant {}", dataCommitTime);
+
+      // Create actual empty parquet files on disk
+      createEmptyParquetFilesDistributed(dataMetaClient, commitMetadata);
+      totalFilesCreated += filesInThisCommit;
+      LOG.info("Created {} empty parquet files on disk (total so far: {})",
+          filesInThisCommit, totalFilesCreated);
+
+      dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+      // Write both /files and /column_stats partitions to metadata table
+      @SuppressWarnings("rawtypes")
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
commitExpectedStats =
+          writeFilesAndColumnStatsToMetadataTable(dataConfig, dataMetaClient, 
commitMetadata, dataCommitTime, mdtConfig);
+
+      // Accumulate expected stats from this commit
+      allExpectedStats.putAll(commitExpectedStats);
+
+      remainingFiles -= filesInThisCommit;
+      LOG.info("Commit {}/{} completed. Remaining files: {}", commitIdx + 1, 
numCommits, remainingFiles);
+    }
+
+    LOG.info("=== All {} commits completed. Total files created: {} ===", 
numCommits, totalFilesCreated);
+
+    // STEP 4: Verification
+    LOG.info("Total unique files with stats: {}", allExpectedStats.size());
+
+    // STEP 5: Use HoodieFileIndex.filterFileSlices to query and verify
+    queryAndVerifyColumnStats(dataConfig, dataMetaClient, allExpectedStats, 
totalFilesCreated);
+  }
+
+  /**
+   * Generates a list of date-based partition paths incrementing by day.
+   * Starting from 2020-01-01, creates partitions for consecutive days based 
on numPartitions.
+   * <p>
+   * Example:
+   * numPartitions = 1  -> ["2020-01-01"]
+   * numPartitions = 3  -> ["2020-01-01", "2020-01-02", "2020-01-03"]
+   * numPartitions = 10 -> ["2020-01-01", "2020-01-02", ..., "2020-01-10"]
+   *
+   * @param numPartitions Number of partitions to generate
+   * @return List of partition paths in yyyy-MM-dd format
+   */
+  private List<String> generatePartitions(int numPartitions) {
+    if (numPartitions <= 0) {
+      throw new IllegalArgumentException("numPartitions must be greater than 
0, got: " + numPartitions);
+    }
+
+    List<String> partitions = new ArrayList<>();
+    LocalDate startDate = LocalDate.of(2020, 1, 1);
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+    for (int i = 0; i < numPartitions; i++) {
+      LocalDate partitionDate = startDate.plusDays(i);
+      String partitionPath = partitionDate.format(formatter);
+      partitions.add(partitionPath);
+    }
+
+    LOG.info("Generated {} partitions from {} to {}",
+        numPartitions,
+        partitions.get(0),
+        partitions.get(partitions.size() - 1));
+
+    return partitions;
+  }
+
+  /**
+   * Print column stats for verification - shows min/max values for up to 10 
files per partition.
+   * This helps verify that column stats were constructed properly before 
querying.
+   *
+   * @param commitMetadata The commit metadata containing partition and file 
information
+   * @param expectedStats  The expected column stats map (file name -> column 
name -> stats)
+   */
+  @SuppressWarnings("rawtypes")
+  private void printColumnStatsForVerification(
+      HoodieCommitMetadata commitMetadata,
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats) {
+
+    LOG.info("=== STEP 4: Verifying column stats construction (max 10 files 
per partition) ===");
+
+    Map<String, List<HoodieWriteStat>> partitionToWriteStats = 
commitMetadata.getPartitionToWriteStats();
+
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      String partitionPath = entry.getKey();
+      List<HoodieWriteStat> writeStats = entry.getValue();
+
+      LOG.info("Partition: {} ({} files total)", partitionPath, 
writeStats.size());
+      LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s",
+          "FileName", "age_min", "age_max", "salary_min", "salary_max"));
+      LOG.info(String.join("", Collections.nCopies(110, "-")));
+
+      int filesDisplayed = 0;
+      for (HoodieWriteStat writeStat : writeStats) {
+        if (filesDisplayed >= 10) {
+          LOG.info("... and {} more files", writeStats.size() - 10);
+          break;
+        }
+
+        String filePath = writeStat.getPath();
+        String fileName = new StoragePath(filePath).getName();
+
+        Map<String, HoodieColumnRangeMetadata<Comparable>> fileStats = 
expectedStats.get(fileName);
+        if (fileStats != null) {
+          HoodieColumnRangeMetadata<Comparable> ageStats = 
fileStats.get("age");
+          HoodieColumnRangeMetadata<Comparable> salaryStats = 
fileStats.get("salary");
+
+          String ageMin = (ageStats != null) ? 
String.valueOf(ageStats.getMinValue()) : "N/A";
+          String ageMax = (ageStats != null) ? 
String.valueOf(ageStats.getMaxValue()) : "N/A";
+          String salaryMin = (salaryStats != null) ? 
String.valueOf(salaryStats.getMinValue()) : "N/A";
+          String salaryMax = (salaryStats != null) ? 
String.valueOf(salaryStats.getMaxValue()) : "N/A";
+
+          LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s",
+              fileName.length() > 48 ? fileName.substring(0, 48) + ".." : 
fileName,
+              ageMin, ageMax, salaryMin, salaryMax));
+        } else {
+          LOG.info(String.format("%-50s %-15s", fileName, "NO STATS FOUND"));
+        }
+
+        filesDisplayed++;
+      }
+    }
+
+    LOG.info("");
+    LOG.info("Total files with stats: {}", expectedStats.size());
+  }
+
+  /**
+   * Query the column stats index using HoodieFileIndex.filterFileSlices and 
verify results.
+   *
+   * @param dataConfig     The write config for the data table
+   * @param dataMetaClient The meta client for the data table
+   * @param expectedStats  The expected column stats for verification
+   * @param numFiles       The total number of files in the commit
+   */
+  @SuppressWarnings("rawtypes")
+  private void queryAndVerifyColumnStats(
+      HoodieWriteConfig dataConfig,
+      HoodieTableMetaClient dataMetaClient,
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats,
+      int numFiles) throws Exception {
+
+    LOG.info("=== STEP 5: Querying column stats index using HoodieFileIndex 
===");
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+    // Create HoodieFileIndex
+    Map<String, String> options = new HashMap<>();
+    options.put("path", dataConfig.getBasePath());
+    options.put("hoodie.datasource.read.data.skipping.enable", "true");
+    options.put("hoodie.metadata.enable", "true");
+    options.put("hoodie.metadata.index.column.stats.enable", "true");
+    // Also ensure the columns are specified for column stats
+    options.put("hoodie.metadata.index.column.stats.column.list", 
"age,salary");
+    
spark.sqlContext().conf().setConfString("hoodie.fileIndex.dataSkippingFailureMode",
 "strict");
+
+    // Create schema with the columns used for data skipping
+    StructType dataSchema = new StructType()
+        .add("id", "string")
+        .add("name", "string")
+        .add("city", "string")
+        .add("age", "int")
+        .add("salary", "long");
+    scala.Option<StructType> schemaOption = scala.Option.apply(dataSchema);
+
+    @SuppressWarnings("deprecation")
+    scala.collection.immutable.Map<String, String> scalaOptions = 
JavaConverters.mapAsScalaMap(options)
+        .toMap(scala.Predef$.MODULE$.<scala.Tuple2<String, String>>conforms());
+
+    org.apache.hudi.HoodieFileIndex fileIndex = new 
org.apache.hudi.HoodieFileIndex(
+        spark,
+        dataMetaClient,
+        schemaOption,
+        scalaOptions,
+        NoopCache$.MODULE$,
+        false,
+        false);
+
+    // Create data filters for age and salary columns
+    // Unresolved expressions cause translateIntoColumnStatsIndexFilterExpr to 
return TrueLiteral (no filtering).
+    List<Expression> dataFilters = new ArrayList<>();
+    String filterString = "age > 90";
+    Expression filter1 = 
org.apache.spark.sql.HoodieCatalystExpressionUtils$.MODULE$
+        .resolveExpr(spark, filterString, dataSchema);
+    LOG.info("DEBUG: Resolved filter expression: {}", filter1);
+    LOG.info("DEBUG: Resolved filter resolved: {}", filter1.resolved());
+    LOG.info("DEBUG: Resolved filter tree:\n{}", filter1.treeString());
+
+    dataFilters.add(filter1);
+    // Expression filter2 = 
org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr(
+    //     sparkSession, "salary > 100000", dataSchema);
+    // dataFilters.add(filter2);
+
+    List<Expression> partitionFilters = new ArrayList<>(); // Empty partition 
filters
+
+    // Convert to Scala Seq
+    scala.collection.immutable.List<Expression> dataFiltersList = 
JavaConverters.asScalaBuffer(dataFilters)
+        .toList();
+    scala.collection.Seq<Expression> dataFiltersSeq = dataFiltersList;
+    scala.collection.immutable.List<Expression> partitionFiltersList = 
JavaConverters
+        .asScalaBuffer(partitionFilters).toList();
+    scala.collection.Seq<Expression> partitionFiltersSeq = 
partitionFiltersList;
+
+    // Call filterFileSlices
+    
scala.collection.Seq<scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>,
+        scala.collection.Seq<FileSlice>>> filteredSlices = fileIndex
+        .filterFileSlices(
+            dataFiltersSeq,
+            partitionFiltersSeq,
+            false);
+
+    // Print results
+    LOG.info("");
+    LOG.info("Filtered File Slices Min/Max Values:");
+    LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s",
+        "FileName", "age_min", "age_max", "salary_min", "salary_max"));
+    LOG.info(String.join("", Collections.nCopies(100, "-")));
+
+    int totalFileSlices = 0;
+    for (int j = 0; j < filteredSlices.size(); j++) {
+      
scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>,
+          scala.collection.Seq<FileSlice>> tuple = filteredSlices.apply(j);
+      scala.collection.Seq<FileSlice> fileSliceSeq = tuple._2();
+      totalFileSlices += fileSliceSeq.size();
+
+      for (int k = 0; k < fileSliceSeq.size(); k++) {
+        FileSlice fileSlice = fileSliceSeq.apply(k);
+        String fileName = fileSlice.getBaseFile().get().getFileName();
+
+        Map<String, HoodieColumnRangeMetadata<Comparable>> fileExpectedStats = 
expectedStats.get(fileName);
+        if (fileExpectedStats != null) {
+          HoodieColumnRangeMetadata<Comparable> ageStats = 
fileExpectedStats.get("age");
+          HoodieColumnRangeMetadata<Comparable> salaryStats = 
fileExpectedStats.get("salary");
+
+          Object ageMin = (ageStats != null) ? ageStats.getMinValue() : "null";
+          Object ageMax = (ageStats != null) ? ageStats.getMaxValue() : "null";
+          Object salaryMin = (salaryStats != null) ? salaryStats.getMinValue() 
: "null";
+          Object salaryMax = (salaryStats != null) ? salaryStats.getMaxValue() 
: "null";
+
+          LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s",
+              fileName, ageMin.toString(), ageMax.toString(), 
salaryMin.toString(),
+              salaryMax.toString()));
+        }
+      }
+    }
+
+    LOG.info(String.join("", Collections.nCopies(100, "-")));
+    LOG.info("Total file slices returned: {}", totalFileSlices);
+    LOG.info("Total files in commit: {}", numFiles);
+
+    if (numFiles > 0) {
+      double skippingRatio = ((double) (numFiles - totalFileSlices) / 
numFiles) * 100.0;
+      LOG.info(String.format("Data skipping ratio: %.2f%%", skippingRatio));
+    }
+  }
+
+  /**
+   * Write both /files and /column_stats partitions to metadata table in a 
single commit.
+   * This method handles initialization of partitions if needed, tags records 
with location,
+   * and writes them together to simulate how actual code writes metadata.
+   *
+   * @param dataConfig     The write config for the data table
+   * @param dataMetaClient The meta client for the data table
+   * @param commitMetadata The commit metadata containing file information
+   * @param dataCommitTime The commit time for the data table commit
+   * @param mdtWriteConfig The write config for the metadata table
+   * @return Map of file names to their column stats metadata for verification
+   * @throws Exception if there's an error writing to the metadata table
+   */
+  @SuppressWarnings("rawtypes")
+  private Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
writeFilesAndColumnStatsToMetadataTable(
+      HoodieWriteConfig dataConfig,
+      HoodieTableMetaClient dataMetaClient,
+      HoodieCommitMetadata commitMetadata,
+      String dataCommitTime,
+      HoodieWriteConfig mdtWriteConfig) throws Exception {
+
+    try (HoodieTableMetadataWriter<?, ?> metadataWriter = 
SparkMetadataWriterFactory.create(
+        engineContext.getStorageConf(),
+        dataConfig,
+        engineContext,
+        Option.empty(),
+        dataMetaClient.getTableConfig())) {
+
+      // STEP 3a: Check if /files partition exists and initialize if needed
+      String metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(cfg.tableBasePath);
+      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
+          .setBasePath(metadataBasePath)
+          .setConf(engineContext.getStorageConf().newInstance())
+          .build();
+
+      boolean filesPartitionExists = dataMetaClient.getTableConfig()
+          .isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+      LOG.info("BEFORE initialization - Metadata table exists: {}, partitions: 
{}",
+          filesPartitionExists,
+          metadataMetaClient.getTableConfig().getMetadataPartitions());
+
+      if (!filesPartitionExists) {

Review Comment:
   ok. if this helps w/initializing FILES directly w/ first commit from 
benchmarking tool, we can leave it as is.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMDTStats.java:
##########
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.MetadataWriterTestUtils;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.stats.ValueMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.execution.datasources.NoopCache$;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+public class HoodieMDTStats implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMDTStats.class);
+
+  private static final int INITIAL_ROW_COUNT = 50; // Rows to insert in STEP 1
+
+  private final Config cfg;
+  // Properties with source, hoodie client, key generator etc.
+  private TypedProperties props;
+
+  private final SparkSession spark;
+
+  private final JavaSparkContext jsc;
+
+  private final HoodieEngineContext engineContext;
+
+  private static final String AVRO_SCHEMA =
+      "{\n"
+          + "  \"type\": \"record\",\n"
+          + "  \"name\": \"Employee\",\n"
+          + "  \"namespace\": \"com.example.avro\",\n"
+          + "  \"fields\": [\n"
+          + "    { \"name\": \"id\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"name\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"city\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"age\", \"type\": \"int\" },\n"
+          + "    { \"name\": \"salary\", \"type\": \"double\" },\n"
+          + "    { \"name\": \"dt\", \"type\": \"string\" }\n"
+          + "  ]\n"
+          + "}\n";
+
+  public HoodieMDTStats(SparkSession spark, Config cfg) {
+    this.spark = spark;
+    this.jsc = new JavaSparkContext(spark.sparkContext());
+    this.engineContext = new HoodieSparkEngineContext(jsc);
+    this.cfg = cfg;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--table-base-path", "-tbp"}, description = "Number of 
columns to index", required = true)
+    public String tableBasePath = null;
+
+    @Parameter(names = {"--cols-to-index", "-num-cols"}, description = "Number 
of columns to index", required = true)
+    public String colsToIndex = "age,salary";
+
+    @Parameter(names = {"--col-stats-file-group-count", "-col-fg-count"}, 
description = "Target Base path for the table", required = true)
+    public Integer colStatsFileGroupCount = 10;
+
+    @Parameter(names = {"--num-files", "-nf"}, description = "Target Base path 
for the table", required = true)
+    public Integer numFiles = 1000;
+
+    @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base 
path for the table", required = true)
+    public Integer numPartitions = 1;
+
+    @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number 
of files to create per commit. If not specified or >= num-files, all files will 
be in one commit", required = false)

Review Comment:
   lets keep this as P1. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMDTStats.java:
##########
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.MetadataWriterTestUtils;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.stats.ValueMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.execution.datasources.NoopCache$;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+public class HoodieMDTStats implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMDTStats.class);
+
+  private static final int INITIAL_ROW_COUNT = 50; // Rows to insert in STEP 1
+
+  private final Config cfg;
+  // Properties with source, hoodie client, key generator etc.
+  private TypedProperties props;
+
+  private final SparkSession spark;
+
+  private final JavaSparkContext jsc;
+
+  private final HoodieEngineContext engineContext;
+
+  private static final String AVRO_SCHEMA =
+      "{\n"
+          + "  \"type\": \"record\",\n"
+          + "  \"name\": \"Employee\",\n"
+          + "  \"namespace\": \"com.example.avro\",\n"
+          + "  \"fields\": [\n"
+          + "    { \"name\": \"id\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"name\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"city\", \"type\": \"string\" },\n"
+          + "    { \"name\": \"age\", \"type\": \"int\" },\n"
+          + "    { \"name\": \"salary\", \"type\": \"double\" },\n"
+          + "    { \"name\": \"dt\", \"type\": \"string\" }\n"
+          + "  ]\n"
+          + "}\n";
+
+  public HoodieMDTStats(SparkSession spark, Config cfg) {
+    this.spark = spark;
+    this.jsc = new JavaSparkContext(spark.sparkContext());
+    this.engineContext = new HoodieSparkEngineContext(jsc);
+    this.cfg = cfg;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--table-base-path", "-tbp"}, description = "Number of 
columns to index", required = true)
+    public String tableBasePath = null;
+
+    @Parameter(names = {"--cols-to-index", "-num-cols"}, description = "Number 
of columns to index", required = true)
+    public String colsToIndex = "age,salary";
+
+    @Parameter(names = {"--col-stats-file-group-count", "-col-fg-count"}, 
description = "Target Base path for the table", required = true)
+    public Integer colStatsFileGroupCount = 10;
+
+    @Parameter(names = {"--num-files", "-nf"}, description = "Target Base path 
for the table", required = true)
+    public Integer numFiles = 1000;
+
+    @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base 
path for the table", required = true)
+    public Integer numPartitions = 1;
+
+    @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number 
of files to create per commit. If not specified or >= num-files, all files will 
be in one commit", required = false)
+    public Integer filesPerCommit = 1000;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command 
line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+
+    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
+        + "hoodie client for clustering")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Override
+    public String toString() {
+      return "TableSizeStats {\n"
+          + "   --col-to-index " + colsToIndex + ", \n"
+          + "   --col-stats-file-group-count " + colStatsFileGroupCount + ", 
\n"
+          + "\n}";
+    }
+  }
+
+  public static void main(String[] args) {
+    final Config cfg = new Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final LocalDateTime now = LocalDateTime.now();
+    final String currentHour = 
now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
+    String jobName = "metadata-table-stats-analyzer";
+    String sparkAppName = jobName + "-" + currentHour;
+    SparkSession spark = SparkSession.builder()
+        .appName(sparkAppName)
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+
+
+    try (HoodieMDTStats hoodieMDTStats = new HoodieMDTStats(spark, cfg)) {
+      hoodieMDTStats.run();
+    } catch (Throwable throwable) {
+      LOG.error("Failed to get table size stats for " + cfg, throwable);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  public void run() throws Exception {
+    int numFiles = cfg.numFiles;
+    int numPartitions = cfg.numPartitions;
+
+    LOG.info("Starting MDT stats test with {} files, {} partitions, {} 
columns, {} file groups",
+        numFiles, numPartitions, cfg.colsToIndex, cfg.colStatsFileGroupCount);
+    LOG.info("Data table base path: {}", cfg.tableBasePath);
+    String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(cfg.tableBasePath);
+    LOG.info("Metadata table base path: {}", metadataTableBasePath);
+
+    String tableName = "test_mdt_stats_tbl";
+    initializeTableWithSampleData(tableName);
+    // Create data table config with metadata enabled
+    HoodieWriteConfig dataWriteConfig = getWriteConfig(tableName, AVRO_SCHEMA, 
cfg.tableBasePath, HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieMetadataConfig metadataConfig = dataWriteConfig.getMetadataConfig();
+    HoodieWriteConfig dataConfig = HoodieWriteConfig.newBuilder()
+        .withProperties(dataWriteConfig.getProps())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .fromProperties(metadataConfig.getProps())
+            .enable(true)
+            .withMetadataIndexColumnStats(true)
+            
.withMetadataIndexColumnStatsFileGroupCount(cfg.colStatsFileGroupCount)
+            .build())
+        .build();
+
+    HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+        .setBasePath(dataConfig.getBasePath())
+        .setConf(engineContext.getStorageConf().newInstance())
+        .build();
+
+    // STEP 1: Insert 50 rows with age and salary columns to initialize table 
schema
+    // and metadata table
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+    Integer filesPerCommit = cfg.filesPerCommit;
+
+    int effectiveFilesPerCommit = filesPerCommit >= numFiles ? numFiles
+        : filesPerCommit;
+
+    // Calculate number of commits needed
+    int numCommits = (int) Math.ceil((double) numFiles / 
effectiveFilesPerCommit);
+
+    LOG.info("Creating {} commits with {} files per commit", numCommits, 
effectiveFilesPerCommit);
+
+    List<String> partitions = generatePartitions(cfg.numPartitions);
+    HoodieTestTable testTable = HoodieTestTable.of(dataMetaClient);
+
+    HoodieWriteConfig mdtConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(
+        dataConfig,
+        HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.NINE);
+
+    // Track all expected stats across all commits for final verification
+    @SuppressWarnings("rawtypes")
+    Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
allExpectedStats = new HashMap<>();
+
+    int remainingFiles = numFiles;
+    int totalFilesCreated = 0;
+
+    // STEP 2 & 3: Create multiple commits with files and metadata
+    for (int commitIdx = 0; commitIdx < numCommits; commitIdx++) {
+      LOG.info("=== Processing commit {}/{} ===", commitIdx + 1, numCommits);
+
+      // Calculate files for this commit
+      int filesInThisCommit = Math.min(effectiveFilesPerCommit, 
remainingFiles);
+      int filesPerPartitionThisCommit = filesInThisCommit / numPartitions;
+
+      LOG.info("Creating {} files in this commit ({} per partition)",
+          filesInThisCommit, filesPerPartitionThisCommit);
+
+      // Generate unique commit time
+      String dataCommitTime = InProcessTimeGenerator.createNewInstantTime();
+
+      // Create commit metadata
+      HoodieCommitMetadata commitMetadata = testTable.createCommitMetadata(
+          dataCommitTime,
+          WriteOperationType.INSERT,
+          partitions,
+          filesPerPartitionThisCommit,
+          false); // bootstrap
+
+      // Add commit to timeline
+      testTable.addCommit(dataCommitTime, Option.of(commitMetadata));
+      LOG.info("Created commit metadata at instant {}", dataCommitTime);
+
+      // Create actual empty parquet files on disk
+      createEmptyParquetFilesDistributed(dataMetaClient, commitMetadata);
+      totalFilesCreated += filesInThisCommit;
+      LOG.info("Created {} empty parquet files on disk (total so far: {})",
+          filesInThisCommit, totalFilesCreated);
+
+      dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+      // Write both /files and /column_stats partitions to metadata table
+      @SuppressWarnings("rawtypes")
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
commitExpectedStats =
+          writeFilesAndColumnStatsToMetadataTable(dataConfig, dataMetaClient, 
commitMetadata, dataCommitTime, mdtConfig);
+
+      // Accumulate expected stats from this commit
+      allExpectedStats.putAll(commitExpectedStats);
+
+      remainingFiles -= filesInThisCommit;
+      LOG.info("Commit {}/{} completed. Remaining files: {}", commitIdx + 1, 
numCommits, remainingFiles);
+    }
+
+    LOG.info("=== All {} commits completed. Total files created: {} ===", 
numCommits, totalFilesCreated);
+
+    // STEP 4: Verification
+    LOG.info("Total unique files with stats: {}", allExpectedStats.size());
+
+    // STEP 5: Use HoodieFileIndex.filterFileSlices to query and verify
+    queryAndVerifyColumnStats(dataConfig, dataMetaClient, allExpectedStats, 
totalFilesCreated);
+  }
+
+  /**
+   * Generates a list of date-based partition paths incrementing by day.
+   * Starting from 2020-01-01, creates partitions for consecutive days based 
on numPartitions.
+   * <p>
+   * Example:
+   * numPartitions = 1  -> ["2020-01-01"]
+   * numPartitions = 3  -> ["2020-01-01", "2020-01-02", "2020-01-03"]
+   * numPartitions = 10 -> ["2020-01-01", "2020-01-02", ..., "2020-01-10"]
+   *
+   * @param numPartitions Number of partitions to generate
+   * @return List of partition paths in yyyy-MM-dd format
+   */
+  private List<String> generatePartitions(int numPartitions) {
+    if (numPartitions <= 0) {
+      throw new IllegalArgumentException("numPartitions must be greater than 
0, got: " + numPartitions);
+    }
+
+    List<String> partitions = new ArrayList<>();
+    LocalDate startDate = LocalDate.of(2020, 1, 1);
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+    for (int i = 0; i < numPartitions; i++) {
+      LocalDate partitionDate = startDate.plusDays(i);
+      String partitionPath = partitionDate.format(formatter);
+      partitions.add(partitionPath);
+    }
+
+    LOG.info("Generated {} partitions from {} to {}",
+        numPartitions,
+        partitions.get(0),
+        partitions.get(partitions.size() - 1));
+
+    return partitions;
+  }
+
+  /**
+   * Print column stats for verification - shows min/max values for up to 10 
files per partition.
+   * This helps verify that column stats were constructed properly before 
querying.
+   *
+   * @param commitMetadata The commit metadata containing partition and file 
information
+   * @param expectedStats  The expected column stats map (file name -> column 
name -> stats)
+   */
+  @SuppressWarnings("rawtypes")
+  private void printColumnStatsForVerification(
+      HoodieCommitMetadata commitMetadata,
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats) {
+
+    LOG.info("=== STEP 4: Verifying column stats construction (max 10 files 
per partition) ===");
+
+    Map<String, List<HoodieWriteStat>> partitionToWriteStats = 
commitMetadata.getPartitionToWriteStats();
+
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      String partitionPath = entry.getKey();
+      List<HoodieWriteStat> writeStats = entry.getValue();
+
+      LOG.info("Partition: {} ({} files total)", partitionPath, 
writeStats.size());
+      LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s",
+          "FileName", "age_min", "age_max", "salary_min", "salary_max"));
+      LOG.info(String.join("", Collections.nCopies(110, "-")));
+
+      int filesDisplayed = 0;
+      for (HoodieWriteStat writeStat : writeStats) {
+        if (filesDisplayed >= 10) {
+          LOG.info("... and {} more files", writeStats.size() - 10);
+          break;
+        }
+
+        String filePath = writeStat.getPath();
+        String fileName = new StoragePath(filePath).getName();
+
+        Map<String, HoodieColumnRangeMetadata<Comparable>> fileStats = 
expectedStats.get(fileName);
+        if (fileStats != null) {
+          HoodieColumnRangeMetadata<Comparable> ageStats = 
fileStats.get("age");
+          HoodieColumnRangeMetadata<Comparable> salaryStats = 
fileStats.get("salary");
+
+          String ageMin = (ageStats != null) ? 
String.valueOf(ageStats.getMinValue()) : "N/A";
+          String ageMax = (ageStats != null) ? 
String.valueOf(ageStats.getMaxValue()) : "N/A";
+          String salaryMin = (salaryStats != null) ? 
String.valueOf(salaryStats.getMinValue()) : "N/A";
+          String salaryMax = (salaryStats != null) ? 
String.valueOf(salaryStats.getMaxValue()) : "N/A";
+
+          LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s",
+              fileName.length() > 48 ? fileName.substring(0, 48) + ".." : 
fileName,
+              ageMin, ageMax, salaryMin, salaryMax));
+        } else {
+          LOG.info(String.format("%-50s %-15s", fileName, "NO STATS FOUND"));
+        }
+
+        filesDisplayed++;
+      }
+    }
+
+    LOG.info("");
+    LOG.info("Total files with stats: {}", expectedStats.size());
+  }
+
+  /**
+   * Query the column stats index using HoodieFileIndex.filterFileSlices and 
verify results.
+   *
+   * @param dataConfig     The write config for the data table
+   * @param dataMetaClient The meta client for the data table
+   * @param expectedStats  The expected column stats for verification
+   * @param numFiles       The total number of files in the commit
+   */
+  @SuppressWarnings("rawtypes")
+  private void queryAndVerifyColumnStats(
+      HoodieWriteConfig dataConfig,
+      HoodieTableMetaClient dataMetaClient,
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats,
+      int numFiles) throws Exception {
+
+    LOG.info("=== STEP 5: Querying column stats index using HoodieFileIndex 
===");
+    dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
+    // Create HoodieFileIndex
+    Map<String, String> options = new HashMap<>();
+    options.put("path", dataConfig.getBasePath());
+    options.put("hoodie.datasource.read.data.skipping.enable", "true");
+    options.put("hoodie.metadata.enable", "true");
+    options.put("hoodie.metadata.index.column.stats.enable", "true");
+    // Also ensure the columns are specified for column stats
+    options.put("hoodie.metadata.index.column.stats.column.list", 
"age,salary");
+    
spark.sqlContext().conf().setConfString("hoodie.fileIndex.dataSkippingFailureMode",
 "strict");
+
+    // Create schema with the columns used for data skipping
+    StructType dataSchema = new StructType()
+        .add("id", "string")
+        .add("name", "string")
+        .add("city", "string")
+        .add("age", "int")
+        .add("salary", "long");
+    scala.Option<StructType> schemaOption = scala.Option.apply(dataSchema);
+
+    @SuppressWarnings("deprecation")
+    scala.collection.immutable.Map<String, String> scalaOptions = 
JavaConverters.mapAsScalaMap(options)
+        .toMap(scala.Predef$.MODULE$.<scala.Tuple2<String, String>>conforms());
+
+    org.apache.hudi.HoodieFileIndex fileIndex = new 
org.apache.hudi.HoodieFileIndex(
+        spark,
+        dataMetaClient,
+        schemaOption,
+        scalaOptions,
+        NoopCache$.MODULE$,
+        false,
+        false);
+
+    // Create data filters for age and salary columns
+    // Unresolved expressions cause translateIntoColumnStatsIndexFilterExpr to 
return TrueLiteral (no filtering).
+    List<Expression> dataFilters = new ArrayList<>();
+    String filterString = "age > 90";
+    Expression filter1 = 
org.apache.spark.sql.HoodieCatalystExpressionUtils$.MODULE$
+        .resolveExpr(spark, filterString, dataSchema);
+    LOG.info("DEBUG: Resolved filter expression: {}", filter1);
+    LOG.info("DEBUG: Resolved filter resolved: {}", filter1.resolved());
+    LOG.info("DEBUG: Resolved filter tree:\n{}", filter1.treeString());
+
+    dataFilters.add(filter1);
+    // Expression filter2 = 
org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr(
+    //     sparkSession, "salary > 100000", dataSchema);
+    // dataFilters.add(filter2);
+
+    List<Expression> partitionFilters = new ArrayList<>(); // Empty partition 
filters
+
+    // Convert to Scala Seq
+    scala.collection.immutable.List<Expression> dataFiltersList = 
JavaConverters.asScalaBuffer(dataFilters)
+        .toList();
+    scala.collection.Seq<Expression> dataFiltersSeq = dataFiltersList;
+    scala.collection.immutable.List<Expression> partitionFiltersList = 
JavaConverters
+        .asScalaBuffer(partitionFilters).toList();
+    scala.collection.Seq<Expression> partitionFiltersSeq = 
partitionFiltersList;
+
+    // Call filterFileSlices
+    
scala.collection.Seq<scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>,
+        scala.collection.Seq<FileSlice>>> filteredSlices = fileIndex
+        .filterFileSlices(
+            dataFiltersSeq,
+            partitionFiltersSeq,
+            false);
+
+    // Print results
+    LOG.info("");
+    LOG.info("Filtered File Slices Min/Max Values:");
+    LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s",
+        "FileName", "age_min", "age_max", "salary_min", "salary_max"));
+    LOG.info(String.join("", Collections.nCopies(100, "-")));
+
+    int totalFileSlices = 0;
+    for (int j = 0; j < filteredSlices.size(); j++) {
+      
scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>,
+          scala.collection.Seq<FileSlice>> tuple = filteredSlices.apply(j);
+      scala.collection.Seq<FileSlice> fileSliceSeq = tuple._2();
+      totalFileSlices += fileSliceSeq.size();
+
+      for (int k = 0; k < fileSliceSeq.size(); k++) {
+        FileSlice fileSlice = fileSliceSeq.apply(k);
+        String fileName = fileSlice.getBaseFile().get().getFileName();
+
+        Map<String, HoodieColumnRangeMetadata<Comparable>> fileExpectedStats = 
expectedStats.get(fileName);
+        if (fileExpectedStats != null) {
+          HoodieColumnRangeMetadata<Comparable> ageStats = 
fileExpectedStats.get("age");
+          HoodieColumnRangeMetadata<Comparable> salaryStats = 
fileExpectedStats.get("salary");
+
+          Object ageMin = (ageStats != null) ? ageStats.getMinValue() : "null";
+          Object ageMax = (ageStats != null) ? ageStats.getMaxValue() : "null";
+          Object salaryMin = (salaryStats != null) ? salaryStats.getMinValue() 
: "null";
+          Object salaryMax = (salaryStats != null) ? salaryStats.getMaxValue() 
: "null";
+
+          LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s",
+              fileName, ageMin.toString(), ageMax.toString(), 
salaryMin.toString(),
+              salaryMax.toString()));
+        }
+      }
+    }
+
+    LOG.info(String.join("", Collections.nCopies(100, "-")));
+    LOG.info("Total file slices returned: {}", totalFileSlices);
+    LOG.info("Total files in commit: {}", numFiles);
+
+    if (numFiles > 0) {
+      double skippingRatio = ((double) (numFiles - totalFileSlices) / 
numFiles) * 100.0;
+      LOG.info(String.format("Data skipping ratio: %.2f%%", skippingRatio));
+    }
+  }
+
+  /**
+   * Write both /files and /column_stats partitions to metadata table in a 
single commit.
+   * This method handles initialization of partitions if needed, tags records 
with location,
+   * and writes them together to simulate how actual code writes metadata.
+   *
+   * @param dataConfig     The write config for the data table
+   * @param dataMetaClient The meta client for the data table
+   * @param commitMetadata The commit metadata containing file information
+   * @param dataCommitTime The commit time for the data table commit
+   * @param mdtWriteConfig The write config for the metadata table
+   * @return Map of file names to their column stats metadata for verification
+   * @throws Exception if there's an error writing to the metadata table
+   */
+  @SuppressWarnings("rawtypes")
+  private Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
writeFilesAndColumnStatsToMetadataTable(
+      HoodieWriteConfig dataConfig,
+      HoodieTableMetaClient dataMetaClient,
+      HoodieCommitMetadata commitMetadata,
+      String dataCommitTime,
+      HoodieWriteConfig mdtWriteConfig) throws Exception {
+
+    try (HoodieTableMetadataWriter<?, ?> metadataWriter = 
SparkMetadataWriterFactory.create(
+        engineContext.getStorageConf(),
+        dataConfig,
+        engineContext,
+        Option.empty(),
+        dataMetaClient.getTableConfig())) {
+
+      // STEP 3a: Check if /files partition exists and initialize if needed
+      String metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(cfg.tableBasePath);
+      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
+          .setBasePath(metadataBasePath)
+          .setConf(engineContext.getStorageConf().newInstance())
+          .build();
+
+      boolean filesPartitionExists = dataMetaClient.getTableConfig()
+          .isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+      LOG.info("BEFORE initialization - Metadata table exists: {}, partitions: 
{}",
+          filesPartitionExists,
+          metadataMetaClient.getTableConfig().getMetadataPartitions());
+
+      if (!filesPartitionExists) {
+        // Mark partition as inflight in table config - this is required for 
tagRecordsWithLocation
+        // to work with isInitializing=true
+        dataMetaClient.getTableConfig().setMetadataPartitionsInflight(
+            dataMetaClient, MetadataPartitionType.FILES);
+        LOG.info("Marked /files partition as inflight for initialization");
+      }
+
+      // Also mark column_stats partition as inflight for initialization
+      boolean colStatsPartitionExists = dataMetaClient.getTableConfig()
+          .isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS);
+      if (!colStatsPartitionExists) {
+        dataMetaClient.getTableConfig().setMetadataPartitionsInflight(
+            dataMetaClient, MetadataPartitionType.COLUMN_STATS);
+        LOG.info("Marked /column_stats partition as inflight for 
initialization");
+
+        // Create index definition for column stats - this tells data skipping 
which columns are indexed
+        org.apache.hudi.common.model.HoodieIndexDefinition colStatsIndexDef =
+            new org.apache.hudi.common.model.HoodieIndexDefinition.Builder()
+                
.withIndexName(MetadataPartitionType.COLUMN_STATS.getPartitionPath())
+                .withIndexType("column_stats")
+                .withSourceFields(java.util.Arrays.asList("age", "salary"))
+                .build();
+        dataMetaClient.buildIndexDefinition(colStatsIndexDef);
+        LOG.info("Created column stats index definition for columns: age, 
salary");
+      }
+
+      // Convert commit metadata to files partition records
+      @SuppressWarnings("unchecked")
+      List<HoodieRecord<HoodieMetadataPayload>> filesRecords = 
(List<HoodieRecord<HoodieMetadataPayload>>) (List<?>)
+          
HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(commitMetadata, 
dataCommitTime);
+
+      // Generate column stats records
+      @SuppressWarnings("rawtypes")
+      Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> 
expectedStats = new HashMap<>();
+      List<HoodieRecord<HoodieMetadataPayload>> columnStatsRecords = 
generateColumnStatsRecordsForCommitMetadata(

Review Comment:
   make this P1. lets focus on other feedback and come to this later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to