Copilot commented on code in PR #516:
URL: https://github.com/apache/hudi-rs/pull/516#discussion_r2680955769


##########
crates/core/src/table/fs_view.rs:
##########
@@ -58,64 +61,150 @@ impl FileSystemView {
         })
     }
 
-    /// Load file groups by listing from the storage.
+    /// Load file groups from the appropriate source (storage or metadata 
table records)
+    /// and apply stats-based pruning.
+    ///
+    /// # File Listing Source
+    /// - If `files_partition_records` is Some: Uses pre-fetched metadata 
table records
+    /// - If `files_partition_records` is None: Uses storage listing via 
FileLister
+    ///
+    /// # Stats Pruning Source (for non-empty file_pruner)
+    /// - Currently: Always extracts stats from Parquet file footers
+    /// - TODO: Use metadata table partitions when available:
+    ///   - partition_stats: Enhance PartitionPruner to prune partitions 
before file listing
+    ///   - column_stats: Prune files without reading Parquet footers
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
+    /// * `file_pruner` - Filters files based on column statistics
+    /// * `table_schema` - Table schema for statistics extraction
+    /// * `timeline_view` - The timeline view providing query timestamp and 
completion time lookups
+    /// * `files_partition_records` - Optional pre-fetched metadata table 
records
+    async fn load_file_groups(
         &self,
         partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        timeline_view: &TimelineView,
+        files_partition_records: Option<&HashMap<String, 
FilesPartitionRecord>>,
     ) -> Result<()> {
-        let lister = FileLister::new(
-            self.hudi_configs.clone(),
-            self.storage.clone(),
-            partition_pruner.to_owned(),
-        );
-        let file_groups_map = lister
-            .list_file_groups_for_relevant_partitions(completion_time_view)
-            .await?;
+        // TODO: Enhance PartitionPruner with partition_stats support
+        // - Load partition_stats from metadata table into PartitionPruner
+        // - PartitionPruner.should_include() will use both partition column 
values AND partition_stats
+        // - For non-partitioned tables: check 
partition_pruner.can_any_partition_match() for early return
+
+        // Step 1: Get file groups from appropriate source
+        let file_groups_map = if let Some(records) = files_partition_records {
+            // Use pre-fetched metadata table records
+            let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
+            file_groups_from_files_partition_records(records, 
&base_file_format, timeline_view)?
+        } else {
+            // Use storage listing
+            let lister = FileLister::new(
+                self.hudi_configs.clone(),
+                self.storage.clone(),
+                partition_pruner.to_owned(),
+            );
+            lister
+                .list_file_groups_for_relevant_partitions(timeline_view)
+                .await?
+        };
+
+        // Step 2: Apply partition pruning (for metadata table path) and stats 
pruning
+        // Note: Storage listing path already applies partition pruning via 
FileLister
+        // TODO: Check if metadata table column_stats partition is available
+        // and use that instead of Parquet footers for better performance
         for (partition_path, file_groups) in file_groups_map {
+            // Skip partitions that don't match the pruner (for metadata table 
path)
+            if files_partition_records.is_some()
+                && !partition_pruner.is_empty()
+                && !partition_pruner.should_include(&partition_path)
+            {
+                continue;
+            }
+
+            let retained = self
+                .apply_stats_pruning_from_footers(
+                    file_groups,
+                    file_pruner,
+                    table_schema,
+                    timeline_view.as_of_timestamp(),
+                )
+                .await;
             self.partition_to_file_groups
-                .insert(partition_path, file_groups);
+                .insert(partition_path, retained);
         }
+
         Ok(())
     }
 
-    /// Load file groups from metadata table records.
-    ///
-    /// This is an alternative to `load_file_groups_from_file_system` that uses
-    /// file listing records fetched from the metadata table. Only partitions 
that
-    /// pass the partition pruner will be loaded.
+    /// Apply file-level stats pruning using Parquet file footers.
     ///
-    /// This method is not async because it operates on pre-fetched records.
-    ///
-    /// # Arguments
-    /// * `records` - Metadata table files partition records
-    /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
+    /// Returns the filtered list of file groups that pass the pruning check.
+    /// Files are included (not pruned) if:
+    /// - The pruner has no filters
+    /// - The file is not a Parquet file
+    /// - Column stats cannot be loaded (conservative behavior)
+    /// - The file's stats indicate it might contain matching rows
+    async fn apply_stats_pruning_from_footers(
         &self,
-        records: &HashMap<String, 
crate::metadata::table_record::FilesPartitionRecord>,
-        partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
-    ) -> Result<()> {
-        let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
-        let file_groups_map = file_groups_from_files_partition_records(
-            records,
-            &base_file_format,
-            completion_time_view,
-        )?;
-
-        for entry in file_groups_map.iter() {
-            let partition_path = entry.key();
-            if partition_pruner.is_empty() || 
partition_pruner.should_include(partition_path) {
-                self.partition_to_file_groups
-                    .insert(partition_path.clone(), entry.value().clone());
+        file_groups: Vec<FileGroup>,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        as_of_timestamp: &str,
+    ) -> Vec<FileGroup> {
+        if file_pruner.is_empty() {
+            return file_groups;
+        }
+
+        let mut retained = Vec::with_capacity(file_groups.len());
+
+        for mut fg in file_groups {
+            if let Some(fsl) = fg.get_file_slice_mut_as_of(as_of_timestamp) {
+                let relative_path = match fsl.base_file_relative_path() {
+                    Ok(path) => path,
+                    Err(_) => {
+                        // Cannot get path, include the file group
+                        retained.push(fg);
+                        continue;
+                    }

Review Comment:
   The `Err(_)` pattern swallows the error information. Consider logging the 
error with `log::warn!` or `log::debug!` before including the file group, as 
this provides better visibility during debugging and troubleshooting.



##########
crates/core/src/table/fs_view.rs:
##########
@@ -58,64 +61,150 @@ impl FileSystemView {
         })
     }
 
-    /// Load file groups by listing from the storage.
+    /// Load file groups from the appropriate source (storage or metadata 
table records)
+    /// and apply stats-based pruning.
+    ///
+    /// # File Listing Source
+    /// - If `files_partition_records` is Some: Uses pre-fetched metadata 
table records
+    /// - If `files_partition_records` is None: Uses storage listing via 
FileLister
+    ///
+    /// # Stats Pruning Source (for non-empty file_pruner)
+    /// - Currently: Always extracts stats from Parquet file footers
+    /// - TODO: Use metadata table partitions when available:
+    ///   - partition_stats: Enhance PartitionPruner to prune partitions 
before file listing
+    ///   - column_stats: Prune files without reading Parquet footers
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
+    /// * `file_pruner` - Filters files based on column statistics
+    /// * `table_schema` - Table schema for statistics extraction
+    /// * `timeline_view` - The timeline view providing query timestamp and 
completion time lookups
+    /// * `files_partition_records` - Optional pre-fetched metadata table 
records
+    async fn load_file_groups(
         &self,
         partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        timeline_view: &TimelineView,
+        files_partition_records: Option<&HashMap<String, 
FilesPartitionRecord>>,
     ) -> Result<()> {
-        let lister = FileLister::new(
-            self.hudi_configs.clone(),
-            self.storage.clone(),
-            partition_pruner.to_owned(),
-        );
-        let file_groups_map = lister
-            .list_file_groups_for_relevant_partitions(completion_time_view)
-            .await?;
+        // TODO: Enhance PartitionPruner with partition_stats support
+        // - Load partition_stats from metadata table into PartitionPruner
+        // - PartitionPruner.should_include() will use both partition column 
values AND partition_stats
+        // - For non-partitioned tables: check 
partition_pruner.can_any_partition_match() for early return
+
+        // Step 1: Get file groups from appropriate source
+        let file_groups_map = if let Some(records) = files_partition_records {
+            // Use pre-fetched metadata table records
+            let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
+            file_groups_from_files_partition_records(records, 
&base_file_format, timeline_view)?
+        } else {
+            // Use storage listing
+            let lister = FileLister::new(
+                self.hudi_configs.clone(),
+                self.storage.clone(),
+                partition_pruner.to_owned(),
+            );
+            lister
+                .list_file_groups_for_relevant_partitions(timeline_view)
+                .await?
+        };
+
+        // Step 2: Apply partition pruning (for metadata table path) and stats 
pruning
+        // Note: Storage listing path already applies partition pruning via 
FileLister
+        // TODO: Check if metadata table column_stats partition is available
+        // and use that instead of Parquet footers for better performance
         for (partition_path, file_groups) in file_groups_map {
+            // Skip partitions that don't match the pruner (for metadata table 
path)
+            if files_partition_records.is_some()
+                && !partition_pruner.is_empty()
+                && !partition_pruner.should_include(&partition_path)
+            {
+                continue;
+            }
+
+            let retained = self
+                .apply_stats_pruning_from_footers(
+                    file_groups,
+                    file_pruner,
+                    table_schema,
+                    timeline_view.as_of_timestamp(),
+                )
+                .await;
             self.partition_to_file_groups
-                .insert(partition_path, file_groups);
+                .insert(partition_path, retained);
         }
+
         Ok(())
     }
 
-    /// Load file groups from metadata table records.
-    ///
-    /// This is an alternative to `load_file_groups_from_file_system` that uses
-    /// file listing records fetched from the metadata table. Only partitions 
that
-    /// pass the partition pruner will be loaded.
+    /// Apply file-level stats pruning using Parquet file footers.
     ///
-    /// This method is not async because it operates on pre-fetched records.
-    ///
-    /// # Arguments
-    /// * `records` - Metadata table files partition records
-    /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
+    /// Returns the filtered list of file groups that pass the pruning check.
+    /// Files are included (not pruned) if:
+    /// - The pruner has no filters
+    /// - The file is not a Parquet file
+    /// - Column stats cannot be loaded (conservative behavior)
+    /// - The file's stats indicate it might contain matching rows
+    async fn apply_stats_pruning_from_footers(
         &self,
-        records: &HashMap<String, 
crate::metadata::table_record::FilesPartitionRecord>,
-        partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
-    ) -> Result<()> {
-        let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
-        let file_groups_map = file_groups_from_files_partition_records(
-            records,
-            &base_file_format,
-            completion_time_view,
-        )?;
-
-        for entry in file_groups_map.iter() {
-            let partition_path = entry.key();
-            if partition_pruner.is_empty() || 
partition_pruner.should_include(partition_path) {
-                self.partition_to_file_groups
-                    .insert(partition_path.clone(), entry.value().clone());
+        file_groups: Vec<FileGroup>,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        as_of_timestamp: &str,
+    ) -> Vec<FileGroup> {
+        if file_pruner.is_empty() {
+            return file_groups;
+        }
+
+        let mut retained = Vec::with_capacity(file_groups.len());
+
+        for mut fg in file_groups {
+            if let Some(fsl) = fg.get_file_slice_mut_as_of(as_of_timestamp) {
+                let relative_path = match fsl.base_file_relative_path() {
+                    Ok(path) => path,
+                    Err(_) => {
+                        // Cannot get path, include the file group
+                        retained.push(fg);
+                        continue;
+                    }
+                };
+
+                // Case-insensitive check for .parquet extension
+                if !relative_path.to_lowercase().ends_with(".parquet") {

Review Comment:
   Using `to_lowercase()` and allocating a new string for every file path check 
is inefficient. Consider using a case-insensitive suffix check that doesn't 
allocate, such as checking the last 8 bytes of the path string directly or 
using a method like `ends_with` with a manual case-insensitive comparison of 
just the extension characters.
   ```suggestion
                   // Case-insensitive check for .parquet extension without 
allocating
                   const PARQUET_EXT: &[u8] = b".parquet";
                   let path_bytes = relative_path.as_bytes();
                   let is_parquet = path_bytes.len() >= PARQUET_EXT.len()
                       && path_bytes[path_bytes.len() - PARQUET_EXT.len()..]
                           .iter()
                           .zip(PARQUET_EXT.iter())
                           .all(|(a, b)| a.to_ascii_lowercase() == *b);
   
                   if !is_parquet {
   ```



##########
crates/core/src/table/fs_view.rs:
##########
@@ -58,64 +61,150 @@ impl FileSystemView {
         })
     }
 
-    /// Load file groups by listing from the storage.
+    /// Load file groups from the appropriate source (storage or metadata 
table records)
+    /// and apply stats-based pruning.
+    ///
+    /// # File Listing Source
+    /// - If `files_partition_records` is Some: Uses pre-fetched metadata 
table records
+    /// - If `files_partition_records` is None: Uses storage listing via 
FileLister
+    ///
+    /// # Stats Pruning Source (for non-empty file_pruner)
+    /// - Currently: Always extracts stats from Parquet file footers
+    /// - TODO: Use metadata table partitions when available:
+    ///   - partition_stats: Enhance PartitionPruner to prune partitions 
before file listing
+    ///   - column_stats: Prune files without reading Parquet footers
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
+    /// * `file_pruner` - Filters files based on column statistics
+    /// * `table_schema` - Table schema for statistics extraction
+    /// * `timeline_view` - The timeline view providing query timestamp and 
completion time lookups
+    /// * `files_partition_records` - Optional pre-fetched metadata table 
records
+    async fn load_file_groups(
         &self,
         partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        timeline_view: &TimelineView,
+        files_partition_records: Option<&HashMap<String, 
FilesPartitionRecord>>,
     ) -> Result<()> {
-        let lister = FileLister::new(
-            self.hudi_configs.clone(),
-            self.storage.clone(),
-            partition_pruner.to_owned(),
-        );
-        let file_groups_map = lister
-            .list_file_groups_for_relevant_partitions(completion_time_view)
-            .await?;
+        // TODO: Enhance PartitionPruner with partition_stats support
+        // - Load partition_stats from metadata table into PartitionPruner
+        // - PartitionPruner.should_include() will use both partition column 
values AND partition_stats
+        // - For non-partitioned tables: check 
partition_pruner.can_any_partition_match() for early return
+
+        // Step 1: Get file groups from appropriate source
+        let file_groups_map = if let Some(records) = files_partition_records {
+            // Use pre-fetched metadata table records
+            let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
+            file_groups_from_files_partition_records(records, 
&base_file_format, timeline_view)?
+        } else {
+            // Use storage listing
+            let lister = FileLister::new(
+                self.hudi_configs.clone(),
+                self.storage.clone(),
+                partition_pruner.to_owned(),
+            );
+            lister
+                .list_file_groups_for_relevant_partitions(timeline_view)
+                .await?
+        };
+
+        // Step 2: Apply partition pruning (for metadata table path) and stats 
pruning
+        // Note: Storage listing path already applies partition pruning via 
FileLister
+        // TODO: Check if metadata table column_stats partition is available
+        // and use that instead of Parquet footers for better performance
         for (partition_path, file_groups) in file_groups_map {
+            // Skip partitions that don't match the pruner (for metadata table 
path)
+            if files_partition_records.is_some()
+                && !partition_pruner.is_empty()
+                && !partition_pruner.should_include(&partition_path)
+            {
+                continue;
+            }
+
+            let retained = self
+                .apply_stats_pruning_from_footers(
+                    file_groups,
+                    file_pruner,
+                    table_schema,
+                    timeline_view.as_of_timestamp(),
+                )
+                .await;
             self.partition_to_file_groups
-                .insert(partition_path, file_groups);
+                .insert(partition_path, retained);
         }
+
         Ok(())
     }
 
-    /// Load file groups from metadata table records.
-    ///
-    /// This is an alternative to `load_file_groups_from_file_system` that uses
-    /// file listing records fetched from the metadata table. Only partitions 
that
-    /// pass the partition pruner will be loaded.
+    /// Apply file-level stats pruning using Parquet file footers.
     ///
-    /// This method is not async because it operates on pre-fetched records.
-    ///
-    /// # Arguments
-    /// * `records` - Metadata table files partition records
-    /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
+    /// Returns the filtered list of file groups that pass the pruning check.
+    /// Files are included (not pruned) if:
+    /// - The pruner has no filters
+    /// - The file is not a Parquet file
+    /// - Column stats cannot be loaded (conservative behavior)
+    /// - The file's stats indicate it might contain matching rows
+    async fn apply_stats_pruning_from_footers(
         &self,
-        records: &HashMap<String, 
crate::metadata::table_record::FilesPartitionRecord>,
-        partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
-    ) -> Result<()> {
-        let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
-        let file_groups_map = file_groups_from_files_partition_records(
-            records,
-            &base_file_format,
-            completion_time_view,
-        )?;
-
-        for entry in file_groups_map.iter() {
-            let partition_path = entry.key();
-            if partition_pruner.is_empty() || 
partition_pruner.should_include(partition_path) {
-                self.partition_to_file_groups
-                    .insert(partition_path.clone(), entry.value().clone());
+        file_groups: Vec<FileGroup>,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        as_of_timestamp: &str,
+    ) -> Vec<FileGroup> {
+        if file_pruner.is_empty() {
+            return file_groups;
+        }
+
+        let mut retained = Vec::with_capacity(file_groups.len());
+
+        for mut fg in file_groups {

Review Comment:
   The use of `mut fg` followed by reassignment to `retained` creates an 
unnecessary mutable binding. Since `fg` is only used immutably after this point 
(calling `get_file_slice_mut_as_of` doesn't require `fg` itself to be mutable, 
just needs a mutable reference to its contents), the `mut` keyword can be 
removed.



##########
crates/core/tests/statistics_tests.rs:
##########
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Integration tests for the statistics module.
+//!
+//! These tests generate Parquet files with known data, then verify that
+//! the statistics extraction correctly reads min/max values.
+
+use std::fs::File;
+use std::sync::Arc;
+
+use arrow_array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int8Array, Int16Array,
+    Int32Array, Int64Array, RecordBatch, StringArray, 
TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, 
UInt32Array,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::FileReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use tempfile::tempdir;
+
+use hudi_core::statistics::{StatisticsContainer, StatsGranularity};
+
+/// Helper to write a RecordBatch to a Parquet file and return the path.
+fn write_parquet_file(batch: &RecordBatch, path: &std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props)).unwrap();
+    writer.write(batch).unwrap();
+    writer.close().unwrap();
+}
+
+/// Helper to write a RecordBatch to a Parquet file without statistics.
+fn write_parquet_file_no_stats(batch: &RecordBatch, path: &std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props)).unwrap();
+    writer.write(batch).unwrap();
+    writer.close().unwrap();
+}
+
+/// Helper to write multiple RecordBatches to a single Parquet file (multiple 
row groups).
+fn write_parquet_file_multiple_row_groups(batches: &[RecordBatch], path: 
&std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+        .set_max_row_group_size(3) // Force smaller row groups
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batches[0].schema(), 
Some(props)).unwrap();
+    for batch in batches {
+        writer.write(batch).unwrap();
+    }
+    writer.close().unwrap();
+}

Review Comment:
   The test helpers `write_parquet_file`, `write_parquet_file_no_stats`, and 
`write_parquet_file_multiple_row_groups` all use `.unwrap()` liberally. While 
panicking in test setup is acceptable, these could benefit from better error 
messages using `.expect()` to clarify which operation failed during test setup.



##########
crates/core/src/table/file_pruner.rs:
##########
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! File-level pruner for filtering files based on column statistics.
+
+use crate::Result;
+use crate::expr::ExprOperator;
+use crate::expr::filter::{Filter, SchemableFilter};
+use crate::statistics::{ColumnStatistics, StatisticsContainer};
+
+use arrow_array::{ArrayRef, Datum};
+use arrow_ord::cmp;
+use arrow_schema::Schema;
+use std::collections::HashSet;
+
+/// A file-level pruner that filters files based on column statistics.
+///
+/// This pruner uses min/max statistics from Parquet files to determine if a 
file
+/// can be skipped (pruned) based on query predicates. A file is pruned if its
+/// statistics prove that no rows in the file can match the predicate.
+#[derive(Debug, Clone)]
+pub struct FilePruner {
+    /// Filters that apply to non-partition columns
+    and_filters: Vec<SchemableFilter>,
+}
+
+impl FilePruner {
+    /// Creates a new file pruner with filters on non-partition columns.
+    ///
+    /// Filters on partition columns are excluded since they are handled by 
PartitionPruner.
+    ///
+    /// # Arguments
+    /// * `and_filters` - List of filters to apply
+    /// * `table_schema` - The table's data schema
+    /// * `partition_schema` - The partition schema (filters on these columns 
are excluded)
+    pub fn new(
+        and_filters: &[Filter],
+        table_schema: &Schema,
+        partition_schema: &Schema,
+    ) -> Result<Self> {
+        // Get partition column names to exclude
+        let partition_columns: HashSet<&str> = partition_schema
+            .fields()
+            .iter()
+            .map(|f| f.name().as_str())
+            .collect();
+
+        // Only keep filters on non-partition columns that exist in the table 
schema
+        let and_filters: Vec<SchemableFilter> = and_filters
+            .iter()
+            .filter(|filter| 
!partition_columns.contains(filter.field_name.as_str()))
+            .filter_map(|filter| SchemableFilter::try_from((filter.clone(), 
table_schema)).ok())
+            .collect();
+
+        Ok(FilePruner { and_filters })
+    }
+
+    /// Creates an empty file pruner that does not filter any files.
+    pub fn empty() -> Self {
+        FilePruner {
+            and_filters: Vec::new(),
+        }
+    }
+
+    /// Returns `true` if the pruner does not have any filters.
+    pub fn is_empty(&self) -> bool {
+        self.and_filters.is_empty()
+    }
+
+    /// Returns `true` if the file should be included based on its statistics.
+    ///
+    /// A file is included if ANY of its rows MIGHT match all the filters.
+    /// A file is excluded (pruned) only if we can prove that NO rows can 
match.
+    ///
+    /// If statistics are missing or incomplete, the file is included (safe 
default).
+    pub fn should_include(&self, stats: &StatisticsContainer) -> bool {
+        // If no filters, include everything
+        if self.and_filters.is_empty() {
+            return true;
+        }
+
+        // All filters must pass (AND semantics)
+        // If any filter definitively excludes the file, return false
+        for filter in &self.and_filters {
+            let col_name = filter.field.name();
+
+            // Get column statistics. When using 
StatisticsContainer::from_parquet_metadata(),
+            // all schema columns will have an entry. However, stats may come 
from other sources
+            // (e.g., manually constructed), so we handle missing columns 
defensively.
+            let Some(col_stats) = stats.columns.get(col_name) else {
+                // No stats for this column, cannot prune - include the file
+                continue;
+            };
+
+            // Check if this filter can prune the file
+            if self.can_prune_by_filter(filter, col_stats) {
+                return false; // File can be pruned
+            }
+        }
+
+        true // File should be included
+    }
+
+    /// Determines if a file can be pruned based on a single filter and column 
stats.
+    ///
+    /// Returns `true` if the file can definitely be pruned (no rows can 
match).
+    fn can_prune_by_filter(&self, filter: &SchemableFilter, col_stats: 
&ColumnStatistics) -> bool {
+        // Get the filter value as an ArrayRef
+        let filter_array = self.extract_filter_array(filter);
+        let Some(filter_value) = filter_array else {
+            return false; // Cannot extract value, don't prune
+        };
+
+        let min = &col_stats.min_value;
+        let max = &col_stats.max_value;
+
+        match filter.operator {
+            ExprOperator::Eq => {
+                // Prune if: value < min OR value > max
+                self.can_prune_eq(&filter_value, min, max)
+            }
+            ExprOperator::Ne => {
+                // Prune if: min = max = value (all rows equal the excluded 
value)
+                self.can_prune_ne(&filter_value, min, max)
+            }
+            ExprOperator::Lt => {
+                // Prune if: min >= value (all values are >= the threshold)
+                self.can_prune_lt(&filter_value, min)
+            }
+            ExprOperator::Lte => {
+                // Prune if: min > value
+                self.can_prune_lte(&filter_value, min)
+            }
+            ExprOperator::Gt => {
+                // Prune if: max <= value (all values are <= the threshold)
+                self.can_prune_gt(&filter_value, max)
+            }
+            ExprOperator::Gte => {
+                // Prune if: max < value
+                self.can_prune_gte(&filter_value, max)
+            }
+        }
+    }
+
+    /// Prune for `col = value`: prune if value < min OR value > max
+    fn can_prune_eq(
+        &self,
+        value: &ArrayRef,
+        min: &Option<ArrayRef>,
+        max: &Option<ArrayRef>,
+    ) -> bool {
+        // Need both min and max to make this decision
+        let Some(min_val) = min else {
+            return false;
+        };
+        let Some(max_val) = max else {
+            return false;
+        };
+
+        // Prune if value < min OR value > max
+        let value_lt_min = cmp::lt(value, min_val).map(|r| 
r.value(0)).unwrap_or(false);
+        let value_gt_max = cmp::gt(value, max_val).map(|r| 
r.value(0)).unwrap_or(false);

Review Comment:
   The `.unwrap_or(false)` on comparison results means that if the comparison 
fails (returns an error), the file won't be pruned. While this is conservative 
and safe, it would be helpful to log when comparisons fail unexpectedly, as 
this could indicate data type mismatches or other issues worth investigating.



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+//!
+//! Core types:
+//! - [`ColumnStatistics`]: Per-column statistics (min, max) for range-based 
pruning
+//! - [`StatisticsContainer`]: Container for all column statistics at a given 
granularity
+//!
+//! Min/max values are stored as single-element Arrow arrays (`ArrayRef`), 
enabling
+//! direct comparison using `arrow_ord::cmp` functions without custom enum 
types.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow_array::{
+    ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, 
Float64Array, Int8Array,
+    Int16Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, 
UInt8Array,
+    UInt16Array, UInt32Array, UInt64Array,
+};
+use arrow_ord::cmp;
+use arrow_schema::{DataType, Schema, TimeUnit};
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+///
+/// String parsing is case-insensitive and accepts "row_group" or "rowgroup" 
for RowGroup.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files.
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+///
+/// Tracks min and max values from Parquet footer statistics for range-based 
pruning.
+/// Values are stored as single-element Arrow arrays for direct comparison 
using `arrow_ord::cmp`.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (stored as a single-element Arrow array)
+    pub min_value: Option<ArrayRef>,
+    /// Maximum value (stored as a single-element Arrow array)
+    pub max_value: Option<ArrayRef>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Extracts min/max as single-element Arrow arrays using the Arrow 
data_type
+    /// for correct logical type representation.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_min_max_arrays(stats, 
data_type);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (self.min_value.take(), &other.min_value) {
+            (Some(a), Some(b)) => {
+                if is_less_than(&a, b) {
+                    Some(a)
+                } else {
+                    Some(Arc::clone(b))
+                }
+            }
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(Arc::clone(b)),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (self.max_value.take(), &other.max_value) {
+            (Some(a), Some(b)) => {
+                if is_greater_than(&a, b) {
+                    Some(a)
+                } else {
+                    Some(Arc::clone(b))
+                }
+            }
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(Arc::clone(b)),
+            (None, None) => None,
+        };
+    }
+}
+
+/// Returns true if `a < b` using arrow-ord comparison.
+fn is_less_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+    cmp::lt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Returns true if `a > b` using arrow-ord comparison.
+fn is_greater_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+    cmp::gt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Get total rows directly from file metadata
+        container.num_rows = Some(metadata.file_metadata().num_rows());
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name)
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            let col_path = col_chunk.column_descr().path();
+
+            // Skip nested columns (multi-part paths like "struct.field")
+            if col_path.parts().len() > 1 {
+                continue;
+            }
+
+            let Some(col_name) = col_path.parts().first().map(|s| s.as_str()) 
else {
+                continue;
+            };
+
+            // Skip if we don't have type info for this column
+            let Some(&data_type) = column_types.get(col_name) else {
+                continue;
+            };
+
+            // Extract statistics if available
+            if let Some(stats) = col_chunk.statistics() {
+                let col_stats =
+                    ColumnStatistics::from_parquet_statistics(col_name, 
data_type, stats);
+                container.columns.insert(col_name.to_string(), col_stats);
+            } else {
+                // No stats available, create empty entry
+                container.columns.insert(
+                    col_name.to_string(),
+                    ColumnStatistics::new(col_name.to_string(), 
data_type.clone()),
+                );
+            }
+        }
+
+        container
+    }
+}
+
+/// Convert Parquet statistics to Arrow single-element arrays.
+///
+/// Uses the Arrow DataType to create appropriately typed arrays from Parquet
+/// physical type statistics.
+fn parquet_stats_to_min_max_arrays(
+    stats: &ParquetStatistics,
+    data_type: &DataType,
+) -> (Option<ArrayRef>, Option<ArrayRef>) {
+    match stats {
+        ParquetStatistics::Boolean(s) => {
+            let min = s
+                .min_opt()
+                .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+            let max = s
+                .max_opt()
+                .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+            (min, max)
+        }
+        ParquetStatistics::Int32(s) => {
+            // Create arrays based on the Arrow logical type
+            let min = s.min_opt().map(|v| int32_to_array(*v, data_type));
+            let max = s.max_opt().map(|v| int32_to_array(*v, data_type));
+            (min, max)
+        }
+        ParquetStatistics::Int64(s) => {
+            let min = s.min_opt().map(|v| int64_to_array(*v, data_type));
+            let max = s.max_opt().map(|v| int64_to_array(*v, data_type));
+            (min, max)
+        }
+        ParquetStatistics::Int96(_) => {
+            // Int96 is deprecated, typically used for timestamps in legacy 
Parquet
+            (None, None)

Review Comment:
   The function `parquet_stats_to_min_max_arrays` handles Parquet Int96 by 
returning `(None, None)` with a comment about it being deprecated. However, 
Int96 timestamps are still commonly found in legacy Parquet files. Consider 
logging a warning when Int96 columns are encountered so users understand why 
statistics are unavailable for these columns.



##########
crates/core/src/table/fs_view.rs:
##########
@@ -58,64 +61,150 @@ impl FileSystemView {
         })
     }
 
-    /// Load file groups by listing from the storage.
+    /// Load file groups from the appropriate source (storage or metadata 
table records)
+    /// and apply stats-based pruning.
+    ///
+    /// # File Listing Source
+    /// - If `files_partition_records` is Some: Uses pre-fetched metadata 
table records
+    /// - If `files_partition_records` is None: Uses storage listing via 
FileLister
+    ///
+    /// # Stats Pruning Source (for non-empty file_pruner)
+    /// - Currently: Always extracts stats from Parquet file footers
+    /// - TODO: Use metadata table partitions when available:
+    ///   - partition_stats: Enhance PartitionPruner to prune partitions 
before file listing
+    ///   - column_stats: Prune files without reading Parquet footers
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
+    /// * `file_pruner` - Filters files based on column statistics
+    /// * `table_schema` - Table schema for statistics extraction
+    /// * `timeline_view` - The timeline view providing query timestamp and 
completion time lookups
+    /// * `files_partition_records` - Optional pre-fetched metadata table 
records
+    async fn load_file_groups(
         &self,
         partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        timeline_view: &TimelineView,
+        files_partition_records: Option<&HashMap<String, 
FilesPartitionRecord>>,
     ) -> Result<()> {
-        let lister = FileLister::new(
-            self.hudi_configs.clone(),
-            self.storage.clone(),
-            partition_pruner.to_owned(),
-        );
-        let file_groups_map = lister
-            .list_file_groups_for_relevant_partitions(completion_time_view)
-            .await?;
+        // TODO: Enhance PartitionPruner with partition_stats support
+        // - Load partition_stats from metadata table into PartitionPruner
+        // - PartitionPruner.should_include() will use both partition column 
values AND partition_stats
+        // - For non-partitioned tables: check 
partition_pruner.can_any_partition_match() for early return
+
+        // Step 1: Get file groups from appropriate source
+        let file_groups_map = if let Some(records) = files_partition_records {
+            // Use pre-fetched metadata table records
+            let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
+            file_groups_from_files_partition_records(records, 
&base_file_format, timeline_view)?
+        } else {
+            // Use storage listing
+            let lister = FileLister::new(
+                self.hudi_configs.clone(),
+                self.storage.clone(),
+                partition_pruner.to_owned(),
+            );
+            lister
+                .list_file_groups_for_relevant_partitions(timeline_view)
+                .await?
+        };
+
+        // Step 2: Apply partition pruning (for metadata table path) and stats 
pruning
+        // Note: Storage listing path already applies partition pruning via 
FileLister
+        // TODO: Check if metadata table column_stats partition is available
+        // and use that instead of Parquet footers for better performance
         for (partition_path, file_groups) in file_groups_map {
+            // Skip partitions that don't match the pruner (for metadata table 
path)
+            if files_partition_records.is_some()
+                && !partition_pruner.is_empty()
+                && !partition_pruner.should_include(&partition_path)
+            {
+                continue;
+            }
+
+            let retained = self
+                .apply_stats_pruning_from_footers(
+                    file_groups,
+                    file_pruner,
+                    table_schema,
+                    timeline_view.as_of_timestamp(),
+                )
+                .await;
             self.partition_to_file_groups
-                .insert(partition_path, file_groups);
+                .insert(partition_path, retained);
         }
+
         Ok(())
     }
 
-    /// Load file groups from metadata table records.
-    ///
-    /// This is an alternative to `load_file_groups_from_file_system` that uses
-    /// file listing records fetched from the metadata table. Only partitions 
that
-    /// pass the partition pruner will be loaded.
+    /// Apply file-level stats pruning using Parquet file footers.
     ///
-    /// This method is not async because it operates on pre-fetched records.
-    ///
-    /// # Arguments
-    /// * `records` - Metadata table files partition records
-    /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
+    /// Returns the filtered list of file groups that pass the pruning check.
+    /// Files are included (not pruned) if:
+    /// - The pruner has no filters
+    /// - The file is not a Parquet file
+    /// - Column stats cannot be loaded (conservative behavior)
+    /// - The file's stats indicate it might contain matching rows
+    async fn apply_stats_pruning_from_footers(
         &self,
-        records: &HashMap<String, 
crate::metadata::table_record::FilesPartitionRecord>,
-        partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
-    ) -> Result<()> {
-        let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
-        let file_groups_map = file_groups_from_files_partition_records(
-            records,
-            &base_file_format,
-            completion_time_view,
-        )?;
-
-        for entry in file_groups_map.iter() {
-            let partition_path = entry.key();
-            if partition_pruner.is_empty() || 
partition_pruner.should_include(partition_path) {
-                self.partition_to_file_groups
-                    .insert(partition_path.clone(), entry.value().clone());
+        file_groups: Vec<FileGroup>,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        as_of_timestamp: &str,
+    ) -> Vec<FileGroup> {
+        if file_pruner.is_empty() {
+            return file_groups;
+        }
+
+        let mut retained = Vec::with_capacity(file_groups.len());
+
+        for mut fg in file_groups {
+            if let Some(fsl) = fg.get_file_slice_mut_as_of(as_of_timestamp) {
+                let relative_path = match fsl.base_file_relative_path() {
+                    Ok(path) => path,
+                    Err(_) => {
+                        // Cannot get path, include the file group
+                        retained.push(fg);
+                        continue;
+                    }
+                };
+
+                // Case-insensitive check for .parquet extension
+                if !relative_path.to_lowercase().ends_with(".parquet") {
+                    retained.push(fg);
+                    continue;
+                }
+
+                // Load column stats from Parquet footer
+                let stats = match self
+                    .storage
+                    .get_parquet_column_stats(&relative_path, table_schema)
+                    .await
+                {
+                    Ok(s) => s,
+                    Err(e) => {
+                        log::warn!(
+                            "Failed to load column stats for {relative_path}: 
{e}. Including file."
+                        );
+                        retained.push(fg);
+                        continue;
+                    }
+                };
+
+                if file_pruner.should_include(&stats) {
+                    retained.push(fg);
+                } else {
+                    log::debug!("Pruned file {relative_path} based on column 
stats");
+                }
+            } else {
+                // No file slice as of timestamp, include the file group
+                // (it will be filtered out later in collect_file_slices)

Review Comment:
   The comment says "No file slice as of timestamp, include the file group (it 
will be filtered out later in collect_file_slices)", but this creates a 
situation where file groups are included in the pruning phase only to be 
filtered later. Consider returning early with `continue` instead of including 
the file group, or clarify why this approach is necessary for correctness.
   ```suggestion
                   // No file slice as of timestamp. We intentionally keep the 
file group
                   // here so that this method only applies stats-based 
pruning. The
                   // timestamp-based filtering is centralized in 
`collect_file_slices`,
                   // which will call `get_file_slice_mut_as_of` again and 
ignore this
                   // file group if it still has no slice for the requested 
timestamp.
   ```



##########
crates/core/src/storage/mod.rs:
##########
@@ -230,14 +231,31 @@ impl Storage {
     pub async fn get_parquet_file_schema(
         &self,
         relative_path: &str,
-    ) -> Result<arrow::datatypes::Schema> {
+    ) -> Result<arrow_schema::Schema> {
         let parquet_meta = 
self.get_parquet_file_metadata(relative_path).await?;
         Ok(parquet_to_arrow_schema(
             parquet_meta.file_metadata().schema_descr(),
             None,
         )?)
     }
 
+    /// Get column statistics for a Parquet file.
+    ///
+    /// # Arguments
+    /// * `relative_path` - Relative path to the Parquet file
+    /// * `schema` - Arrow schema to use for extracting statistics
+    pub async fn get_parquet_column_stats(
+        &self,
+        relative_path: &str,
+        schema: &arrow_schema::Schema,
+    ) -> Result<StatisticsContainer> {
+        let parquet_meta = 
self.get_parquet_file_metadata(relative_path).await?;
+        Ok(StatisticsContainer::from_parquet_metadata(
+            &parquet_meta,
+            schema,
+        ))
+    }

Review Comment:
   The public API `get_parquet_column_stats` lacks examples in its 
documentation. Consider adding a usage example showing how to call this method 
and what the returned `StatisticsContainer` contains. This would help users 
understand how to use the statistics for their own pruning logic.



##########
crates/core/src/table/file_pruner.rs:
##########
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! File-level pruner for filtering files based on column statistics.
+
+use crate::Result;
+use crate::expr::ExprOperator;
+use crate::expr::filter::{Filter, SchemableFilter};
+use crate::statistics::{ColumnStatistics, StatisticsContainer};
+
+use arrow_array::{ArrayRef, Datum};
+use arrow_ord::cmp;
+use arrow_schema::Schema;
+use std::collections::HashSet;
+
+/// A file-level pruner that filters files based on column statistics.
+///
+/// This pruner uses min/max statistics from Parquet files to determine if a 
file
+/// can be skipped (pruned) based on query predicates. A file is pruned if its
+/// statistics prove that no rows in the file can match the predicate.
+#[derive(Debug, Clone)]
+pub struct FilePruner {
+    /// Filters that apply to non-partition columns
+    and_filters: Vec<SchemableFilter>,
+}
+
+impl FilePruner {
+    /// Creates a new file pruner with filters on non-partition columns.
+    ///
+    /// Filters on partition columns are excluded since they are handled by 
PartitionPruner.
+    ///
+    /// # Arguments
+    /// * `and_filters` - List of filters to apply
+    /// * `table_schema` - The table's data schema
+    /// * `partition_schema` - The partition schema (filters on these columns 
are excluded)
+    pub fn new(
+        and_filters: &[Filter],
+        table_schema: &Schema,
+        partition_schema: &Schema,
+    ) -> Result<Self> {
+        // Get partition column names to exclude
+        let partition_columns: HashSet<&str> = partition_schema
+            .fields()
+            .iter()
+            .map(|f| f.name().as_str())
+            .collect();
+
+        // Only keep filters on non-partition columns that exist in the table 
schema
+        let and_filters: Vec<SchemableFilter> = and_filters
+            .iter()
+            .filter(|filter| 
!partition_columns.contains(filter.field_name.as_str()))
+            .filter_map(|filter| SchemableFilter::try_from((filter.clone(), 
table_schema)).ok())
+            .collect();
+
+        Ok(FilePruner { and_filters })
+    }
+
+    /// Creates an empty file pruner that does not filter any files.
+    pub fn empty() -> Self {
+        FilePruner {
+            and_filters: Vec::new(),
+        }
+    }
+
+    /// Returns `true` if the pruner does not have any filters.
+    pub fn is_empty(&self) -> bool {
+        self.and_filters.is_empty()
+    }
+
+    /// Returns `true` if the file should be included based on its statistics.
+    ///
+    /// A file is included if ANY of its rows MIGHT match all the filters.
+    /// A file is excluded (pruned) only if we can prove that NO rows can 
match.
+    ///
+    /// If statistics are missing or incomplete, the file is included (safe 
default).
+    pub fn should_include(&self, stats: &StatisticsContainer) -> bool {
+        // If no filters, include everything
+        if self.and_filters.is_empty() {
+            return true;
+        }
+
+        // All filters must pass (AND semantics)
+        // If any filter definitively excludes the file, return false
+        for filter in &self.and_filters {
+            let col_name = filter.field.name();
+
+            // Get column statistics. When using 
StatisticsContainer::from_parquet_metadata(),
+            // all schema columns will have an entry. However, stats may come 
from other sources
+            // (e.g., manually constructed), so we handle missing columns 
defensively.
+            let Some(col_stats) = stats.columns.get(col_name) else {
+                // No stats for this column, cannot prune - include the file
+                continue;
+            };

Review Comment:
   The comment mentions "When using 
StatisticsContainer::from_parquet_metadata(), all schema columns will have an 
entry", but this creates a tight coupling between this module and the 
statistics module's implementation details. If the statistics extraction logic 
changes, this assumption could break silently. Consider making this defensive 
check more explicit or documenting this contract in the StatisticsContainer API.



##########
crates/core/tests/statistics_tests.rs:
##########
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Integration tests for the statistics module.
+//!
+//! These tests generate Parquet files with known data, then verify that
+//! the statistics extraction correctly reads min/max values.
+
+use std::fs::File;
+use std::sync::Arc;
+
+use arrow_array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int8Array, Int16Array,
+    Int32Array, Int64Array, RecordBatch, StringArray, 
TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, 
UInt32Array,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::FileReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use tempfile::tempdir;
+
+use hudi_core::statistics::{StatisticsContainer, StatsGranularity};
+
+/// Helper to write a RecordBatch to a Parquet file and return the path.
+fn write_parquet_file(batch: &RecordBatch, path: &std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props)).unwrap();
+    writer.write(batch).unwrap();
+    writer.close().unwrap();
+}
+
+/// Helper to write a RecordBatch to a Parquet file without statistics.
+fn write_parquet_file_no_stats(batch: &RecordBatch, path: &std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props)).unwrap();
+    writer.write(batch).unwrap();
+    writer.close().unwrap();
+}
+
+/// Helper to write multiple RecordBatches to a single Parquet file (multiple 
row groups).
+fn write_parquet_file_multiple_row_groups(batches: &[RecordBatch], path: 
&std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+        .set_max_row_group_size(3) // Force smaller row groups
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batches[0].schema(), 
Some(props)).unwrap();
+    for batch in batches {
+        writer.write(batch).unwrap();
+    }
+    writer.close().unwrap();
+}
+
+/// Helper to read Parquet metadata from a file.
+fn read_parquet_metadata(path: &std::path::Path) -> 
parquet::file::metadata::ParquetMetaData {
+    let file = File::open(path).unwrap();
+    let reader = SerializedFileReader::new(file).unwrap();
+    reader.metadata().clone()
+}
+
+/// Helper to extract Int32 value from ArrayRef
+fn get_int32(arr: &ArrayRef) -> i32 {
+    arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int64 value from ArrayRef
+fn get_int64(arr: &ArrayRef) -> i64 {
+    arr.as_any().downcast_ref::<Int64Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int8 value from ArrayRef
+fn get_int8(arr: &ArrayRef) -> i8 {
+    arr.as_any().downcast_ref::<Int8Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int16 value from ArrayRef
+fn get_int16(arr: &ArrayRef) -> i16 {
+    arr.as_any().downcast_ref::<Int16Array>().unwrap().value(0)
+}
+
+/// Helper to extract UInt32 value from ArrayRef
+fn get_uint32(arr: &ArrayRef) -> u32 {
+    arr.as_any().downcast_ref::<UInt32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Float32 value from ArrayRef
+fn get_float32(arr: &ArrayRef) -> f32 {
+    arr.as_any()
+        .downcast_ref::<Float32Array>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract Float64 value from ArrayRef
+fn get_float64(arr: &ArrayRef) -> f64 {
+    arr.as_any()
+        .downcast_ref::<Float64Array>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract Boolean value from ArrayRef
+fn get_bool(arr: &ArrayRef) -> bool {
+    arr.as_any()
+        .downcast_ref::<BooleanArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract String value from ArrayRef
+fn get_string(arr: &ArrayRef) -> String {
+    arr.as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap()
+        .value(0)
+        .to_string()
+}
+
+/// Helper to extract Date32 value from ArrayRef
+fn get_date32(arr: &ArrayRef) -> i32 {
+    arr.as_any().downcast_ref::<Date32Array>().unwrap().value(0)
+}
+
+/// Helper to extract TimestampMicrosecond value from ArrayRef
+fn get_timestamp_micros(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampMicrosecondArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract TimestampSecond value from ArrayRef
+fn get_timestamp_seconds(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampSecondArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract TimestampMillisecond value from ArrayRef
+fn get_timestamp_millis(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampMillisecondArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract TimestampNanosecond value from ArrayRef
+fn get_timestamp_nanos(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampNanosecondArray>()
+        .unwrap()
+        .value(0)
+}

Review Comment:
   The helper functions like `get_int32`, `get_uint32`, etc. all use 
`.unwrap()` which will panic if the downcast fails. While these are test 
helpers and panicking in tests is acceptable, consider adding more descriptive 
panic messages using `.expect()` to make test failures easier to debug. For 
example: `.expect("Failed to downcast to Int32Array")`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to