This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cc771c  feat: support column stats pruning (#516)
0cc771c is described below

commit 0cc771c05e1f410d0c21af14544669001bd8de3c
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jan 12 02:54:57 2026 -0600

    feat: support column stats pruning (#516)
    
    * feat: support column stats pruning
    
    * add logs
---
 Cargo.toml                            |   2 +
 crates/core/Cargo.toml                |   1 +
 crates/core/src/lib.rs                |   1 +
 crates/core/src/metadata/table/mod.rs |  15 +-
 crates/core/src/statistics/mod.rs     | 544 +++++++++++++++++++
 crates/core/src/storage/mod.rs        |  20 +-
 crates/core/src/table/file_pruner.rs  | 587 +++++++++++++++++++++
 crates/core/src/table/fs_view.rs      | 255 ++++++---
 crates/core/src/table/mod.rs          |  16 +-
 crates/core/tests/statistics_tests.rs | 958 ++++++++++++++++++++++++++++++++++
 crates/test/Cargo.toml                |   4 +-
 11 files changed, 2338 insertions(+), 65 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index c6cf37b..af9b8ba 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,3 +94,5 @@ flate2 = { version = "1" }
 
 # testing
 serial_test = { version = "3" }
+tempfile = { version = "3" }
+zip-extract = { version = "0.3" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index b5414fe..ee9a041 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -89,6 +89,7 @@ percent-encoding = { workspace = true }
 [dev-dependencies]
 hudi-test = { path = "../test" }
 serial_test = { workspace = true }
+tempfile = { workspace = true }
 
 [lints.clippy]
 result_large_err = "allow"
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 5b1d891..4001b5c 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -53,6 +53,7 @@ pub mod merge;
 pub mod metadata;
 mod record;
 pub mod schema;
+pub mod statistics;
 pub mod storage;
 pub mod table;
 pub mod timeline;
diff --git a/crates/core/src/metadata/table/mod.rs 
b/crates/core/src/metadata/table/mod.rs
index 9d3afd8..e5e211c 100644
--- a/crates/core/src/metadata/table/mod.rs
+++ b/crates/core/src/metadata/table/mod.rs
@@ -26,6 +26,8 @@ pub mod records;
 
 use std::collections::HashMap;
 
+use arrow_schema::Schema;
+
 use crate::Result;
 use crate::config::read::HudiReadConfig;
 use crate::config::table::HudiTableConfig::{
@@ -36,6 +38,7 @@ use crate::expr::filter::from_str_tuples;
 use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
 use crate::storage::util::join_url_segments;
 use crate::table::Table;
+use crate::table::file_pruner::FilePruner;
 use crate::table::partition::PartitionPruner;
 
 use records::FilesPartitionRecord;
@@ -250,9 +253,19 @@ impl Table {
         let partition_pruner =
             PartitionPruner::new(&filters, &partition_schema, 
self.hudi_configs.as_ref())?;
 
+        // Use empty file pruner for metadata table - no column stats pruning 
needed
+        // Use empty schema since the pruner is empty and won't use the schema
+        let file_pruner = FilePruner::empty();
+        let table_schema = Schema::empty();
+
         let file_slices = self
             .file_system_view
-            .get_file_slices_by_storage_listing(&partition_pruner, 
&timeline_view)
+            .get_file_slices_by_storage_listing(
+                &partition_pruner,
+                &file_pruner,
+                &table_schema,
+                &timeline_view,
+            )
             .await?;
 
         if file_slices.len() != 1 {
diff --git a/crates/core/src/statistics/mod.rs 
b/crates/core/src/statistics/mod.rs
new file mode 100644
index 0000000..de88dea
--- /dev/null
+++ b/crates/core/src/statistics/mod.rs
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+//!
+//! Core types:
+//! - [`ColumnStatistics`]: Per-column statistics (min, max) for range-based 
pruning
+//! - [`StatisticsContainer`]: Container for all column statistics at a given 
granularity
+//!
+//! Min/max values are stored as single-element Arrow arrays (`ArrayRef`), 
enabling
+//! direct comparison using `arrow_ord::cmp` functions without custom enum 
types.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow_array::{
+    ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, 
Float64Array, Int8Array,
+    Int16Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, 
UInt8Array,
+    UInt16Array, UInt32Array, UInt64Array,
+};
+use arrow_ord::cmp;
+use arrow_schema::{DataType, Schema, TimeUnit};
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+///
+/// String parsing is case-insensitive and accepts "row_group" or "rowgroup" 
for RowGroup.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files.
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+///
+/// Tracks min and max values from Parquet footer statistics for range-based 
pruning.
+/// Values are stored as single-element Arrow arrays for direct comparison 
using `arrow_ord::cmp`.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (stored as a single-element Arrow array)
+    pub min_value: Option<ArrayRef>,
+    /// Maximum value (stored as a single-element Arrow array)
+    pub max_value: Option<ArrayRef>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Extracts min/max as single-element Arrow arrays using the Arrow 
data_type
+    /// for correct logical type representation.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_min_max_arrays(stats, 
data_type);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (self.min_value.take(), &other.min_value) {
+            (Some(a), Some(b)) => {
+                if is_less_than(&a, b) {
+                    Some(a)
+                } else {
+                    Some(Arc::clone(b))
+                }
+            }
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(Arc::clone(b)),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (self.max_value.take(), &other.max_value) {
+            (Some(a), Some(b)) => {
+                if is_greater_than(&a, b) {
+                    Some(a)
+                } else {
+                    Some(Arc::clone(b))
+                }
+            }
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(Arc::clone(b)),
+            (None, None) => None,
+        };
+    }
+}
+
+/// Returns true if `a < b` using arrow-ord comparison.
+fn is_less_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+    cmp::lt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Returns true if `a > b` using arrow-ord comparison.
+fn is_greater_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+    cmp::gt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Get total rows directly from file metadata
+        container.num_rows = Some(metadata.file_metadata().num_rows());
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name)
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            let col_path = col_chunk.column_descr().path();
+
+            // Skip nested columns (multi-part paths like "struct.field")
+            if col_path.parts().len() > 1 {
+                continue;
+            }
+
+            let Some(col_name) = col_path.parts().first().map(|s| s.as_str()) 
else {
+                continue;
+            };
+
+            // Skip if we don't have type info for this column
+            let Some(&data_type) = column_types.get(col_name) else {
+                continue;
+            };
+
+            // Extract statistics if available
+            if let Some(stats) = col_chunk.statistics() {
+                let col_stats =
+                    ColumnStatistics::from_parquet_statistics(col_name, 
data_type, stats);
+                container.columns.insert(col_name.to_string(), col_stats);
+            } else {
+                // No stats available, create empty entry
+                container.columns.insert(
+                    col_name.to_string(),
+                    ColumnStatistics::new(col_name.to_string(), 
data_type.clone()),
+                );
+            }
+        }
+
+        container
+    }
+}
+
+/// Convert Parquet statistics to Arrow single-element arrays.
+///
+/// Uses the Arrow DataType to create appropriately typed arrays from Parquet
+/// physical type statistics.
+fn parquet_stats_to_min_max_arrays(
+    stats: &ParquetStatistics,
+    data_type: &DataType,
+) -> (Option<ArrayRef>, Option<ArrayRef>) {
+    match stats {
+        ParquetStatistics::Boolean(s) => {
+            let min = s
+                .min_opt()
+                .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+            let max = s
+                .max_opt()
+                .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+            (min, max)
+        }
+        ParquetStatistics::Int32(s) => {
+            // Create arrays based on the Arrow logical type
+            let min = s.min_opt().map(|v| int32_to_array(*v, data_type));
+            let max = s.max_opt().map(|v| int32_to_array(*v, data_type));
+            (min, max)
+        }
+        ParquetStatistics::Int64(s) => {
+            let min = s.min_opt().map(|v| int64_to_array(*v, data_type));
+            let max = s.max_opt().map(|v| int64_to_array(*v, data_type));
+            (min, max)
+        }
+        ParquetStatistics::Int96(_) => {
+            // Int96 is deprecated, typically used for timestamps in legacy 
Parquet
+            log::debug!("Int96 statistics not supported - legacy Parquet 
timestamp format");
+            (None, None)
+        }
+        ParquetStatistics::Float(s) => {
+            let min = s
+                .min_opt()
+                .map(|v| Arc::new(Float32Array::from(vec![*v])) as ArrayRef);
+            let max = s
+                .max_opt()
+                .map(|v| Arc::new(Float32Array::from(vec![*v])) as ArrayRef);
+            (min, max)
+        }
+        ParquetStatistics::Double(s) => {
+            let min = s
+                .min_opt()
+                .map(|v| Arc::new(Float64Array::from(vec![*v])) as ArrayRef);
+            let max = s
+                .max_opt()
+                .map(|v| Arc::new(Float64Array::from(vec![*v])) as ArrayRef);
+            (min, max)
+        }
+        ParquetStatistics::ByteArray(s) => {
+            let min = s.min_opt().map(|b| bytes_to_array(b.data(), data_type));
+            let max = s.max_opt().map(|b| bytes_to_array(b.data(), data_type));
+            (min, max)
+        }
+        ParquetStatistics::FixedLenByteArray(s) => {
+            let min = s.min_opt().map(|b| bytes_to_array(b.data(), data_type));
+            let max = s.max_opt().map(|b| bytes_to_array(b.data(), data_type));
+            (min, max)
+        }
+    }
+}
+
+/// Convert Parquet Int32 physical value to Arrow array based on logical type.
+fn int32_to_array(value: i32, data_type: &DataType) -> ArrayRef {
+    match data_type {
+        DataType::Int8 => Arc::new(Int8Array::from(vec![value as i8])) as 
ArrayRef,
+        DataType::Int16 => Arc::new(Int16Array::from(vec![value as i16])) as 
ArrayRef,
+        DataType::Int32 => Arc::new(Int32Array::from(vec![value])) as ArrayRef,
+        DataType::UInt8 => Arc::new(UInt8Array::from(vec![value as u8])) as 
ArrayRef,
+        DataType::UInt16 => Arc::new(UInt16Array::from(vec![value as u16])) as 
ArrayRef,
+        DataType::UInt32 => Arc::new(UInt32Array::from(vec![value as u32])) as 
ArrayRef,
+        DataType::Date32 => Arc::new(Date32Array::from(vec![value])) as 
ArrayRef,
+        _ => Arc::new(Int32Array::from(vec![value])) as ArrayRef, // fallback
+    }
+}
+
+/// Convert Parquet Int64 physical value to Arrow array based on logical type.
+fn int64_to_array(value: i64, data_type: &DataType) -> ArrayRef {
+    match data_type {
+        DataType::Int64 => Arc::new(Int64Array::from(vec![value])) as ArrayRef,
+        DataType::UInt64 => Arc::new(UInt64Array::from(vec![value as u64])) as 
ArrayRef,
+        DataType::Timestamp(time_unit, tz) => {
+            match time_unit {
+                TimeUnit::Second => {
+                    
Arc::new(TimestampSecondArray::from(vec![value]).with_timezone_opt(tz.clone()))
+                        as ArrayRef
+                }
+                TimeUnit::Millisecond => Arc::new(
+                    
TimestampMillisecondArray::from(vec![value]).with_timezone_opt(tz.clone()),
+                ) as ArrayRef,
+                TimeUnit::Microsecond => Arc::new(
+                    
TimestampMicrosecondArray::from(vec![value]).with_timezone_opt(tz.clone()),
+                ) as ArrayRef,
+                TimeUnit::Nanosecond => Arc::new(
+                    
TimestampNanosecondArray::from(vec![value]).with_timezone_opt(tz.clone()),
+                ) as ArrayRef,
+            }
+        }
+        _ => Arc::new(Int64Array::from(vec![value])) as ArrayRef, // fallback
+    }
+}
+
+/// Convert Parquet byte array to Arrow array based on logical type.
+fn bytes_to_array(data: &[u8], data_type: &DataType) -> ArrayRef {
+    match data_type {
+        DataType::Utf8 | DataType::LargeUtf8 => {
+            let s = String::from_utf8_lossy(data).into_owned();
+            Arc::new(StringArray::from(vec![s])) as ArrayRef
+        }
+        _ => Arc::new(BinaryArray::from_vec(vec![data])) as ArrayRef,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_stats_granularity_from_str() {
+        assert_eq!(
+            StatsGranularity::from_str("file").unwrap(),
+            StatsGranularity::File
+        );
+        assert_eq!(
+            StatsGranularity::from_str("FILE").unwrap(),
+            StatsGranularity::File
+        );
+        assert_eq!(
+            StatsGranularity::from_str("row_group").unwrap(),
+            StatsGranularity::RowGroup
+        );
+        assert_eq!(
+            StatsGranularity::from_str("rowgroup").unwrap(),
+            StatsGranularity::RowGroup
+        );
+        assert!(StatsGranularity::from_str("invalid").is_err());
+    }
+
+    #[test]
+    fn test_stats_granularity_display() {
+        assert_eq!(format!("{}", StatsGranularity::File), "file");
+        assert_eq!(format!("{}", StatsGranularity::RowGroup), "row_group");
+    }
+
+    #[test]
+    fn test_stats_granularity_default() {
+        assert_eq!(StatsGranularity::default(), StatsGranularity::File);
+    }
+
+    /// Helper to create a single-element Int32 array.
+    fn int32_array(value: i32) -> ArrayRef {
+        Arc::new(Int32Array::from(vec![value])) as ArrayRef
+    }
+
+    /// Helper to create a single-element UInt32 array.
+    fn uint32_array(value: u32) -> ArrayRef {
+        Arc::new(UInt32Array::from(vec![value])) as ArrayRef
+    }
+
+    /// Helper to check array value equality for Int32.
+    fn get_int32(arr: &ArrayRef) -> i32 {
+        arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+    }
+
+    /// Helper to check array value equality for UInt32.
+    fn get_uint32(arr: &ArrayRef) -> u32 {
+        arr.as_any().downcast_ref::<UInt32Array>().unwrap().value(0)
+    }
+
+    #[test]
+    fn test_column_statistics_merge() {
+        // Test 1: Both stats have values - takes min of mins, max of maxs
+        let mut stats1 = ColumnStatistics {
+            column_name: "test".to_string(),
+            data_type: DataType::Int32,
+            min_value: Some(int32_array(10)),
+            max_value: Some(int32_array(50)),
+        };
+        let stats2 = ColumnStatistics {
+            column_name: "test".to_string(),
+            data_type: DataType::Int32,
+            min_value: Some(int32_array(5)),
+            max_value: Some(int32_array(100)),
+        };
+        stats1.merge(&stats2);
+        assert_eq!(get_int32(stats1.min_value.as_ref().unwrap()), 5);
+        assert_eq!(get_int32(stats1.max_value.as_ref().unwrap()), 100);
+
+        // Test 2: One side has None values - preserves the Some value
+        let mut stats3 = ColumnStatistics {
+            column_name: "test".to_string(),
+            data_type: DataType::Int32,
+            min_value: Some(int32_array(10)),
+            max_value: None,
+        };
+        let stats4 = ColumnStatistics {
+            column_name: "test".to_string(),
+            data_type: DataType::Int32,
+            min_value: None,
+            max_value: Some(int32_array(100)),
+        };
+        stats3.merge(&stats4);
+        assert_eq!(get_int32(stats3.min_value.as_ref().unwrap()), 10);
+        assert_eq!(get_int32(stats3.max_value.as_ref().unwrap()), 100);
+    }
+
+    #[test]
+    fn test_unsigned_integer_merge() {
+        // Test that UInt32 values are correctly compared using unsigned 
semantics
+        let mut uint32_stats = ColumnStatistics {
+            column_name: "test".to_string(),
+            data_type: DataType::UInt32,
+            min_value: Some(uint32_array(100)),
+            max_value: Some(uint32_array(200)),
+        };
+        let uint32_large = ColumnStatistics {
+            column_name: "test".to_string(),
+            data_type: DataType::UInt32,
+            min_value: Some(uint32_array(3_000_000_000)),
+            max_value: Some(uint32_array(4_000_000_000)),
+        };
+        uint32_stats.merge(&uint32_large);
+        // 100 < 3B in unsigned comparison
+        assert_eq!(get_uint32(uint32_stats.min_value.as_ref().unwrap()), 100);
+        // 4B > 200 in unsigned comparison
+        assert_eq!(
+            get_uint32(uint32_stats.max_value.as_ref().unwrap()),
+            4_000_000_000
+        );
+    }
+
+    #[test]
+    fn test_arrow_ord_comparisons() {
+        // Test arrow-ord comparisons directly
+        let a = int32_array(10);
+        let b = int32_array(20);
+
+        assert!(is_less_than(&a, &b));
+        assert!(!is_less_than(&b, &a));
+        assert!(is_greater_than(&b, &a));
+        assert!(!is_greater_than(&a, &b));
+
+        // Test unsigned comparisons
+        let small = uint32_array(100);
+        let large = uint32_array(3_000_000_000);
+        assert!(is_less_than(&small, &large));
+        assert!(is_greater_than(&large, &small));
+    }
+}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index e9650d9..e0c0491 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -38,6 +38,7 @@ use url::Url;
 
 use crate::config::HudiConfigs;
 use crate::config::table::HudiTableConfig;
+use crate::statistics::StatisticsContainer;
 use crate::storage::error::StorageError::{Creation, InvalidPath};
 use crate::storage::error::{Result, StorageError};
 use crate::storage::file_metadata::FileMetadata;
@@ -230,7 +231,7 @@ impl Storage {
     pub async fn get_parquet_file_schema(
         &self,
         relative_path: &str,
-    ) -> Result<arrow::datatypes::Schema> {
+    ) -> Result<arrow_schema::Schema> {
         let parquet_meta = 
self.get_parquet_file_metadata(relative_path).await?;
         Ok(parquet_to_arrow_schema(
             parquet_meta.file_metadata().schema_descr(),
@@ -238,6 +239,23 @@ impl Storage {
         )?)
     }
 
+    /// Get column statistics for a Parquet file.
+    ///
+    /// # Arguments
+    /// * `relative_path` - Relative path to the Parquet file
+    /// * `schema` - Arrow schema to use for extracting statistics
+    pub async fn get_parquet_column_stats(
+        &self,
+        relative_path: &str,
+        schema: &arrow_schema::Schema,
+    ) -> Result<StatisticsContainer> {
+        let parquet_meta = 
self.get_parquet_file_metadata(relative_path).await?;
+        Ok(StatisticsContainer::from_parquet_metadata(
+            &parquet_meta,
+            schema,
+        ))
+    }
+
     pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
         let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/file_pruner.rs 
b/crates/core/src/table/file_pruner.rs
new file mode 100644
index 0000000..500a2a3
--- /dev/null
+++ b/crates/core/src/table/file_pruner.rs
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! File-level pruner for filtering files based on column statistics.
+
+use crate::Result;
+use crate::expr::ExprOperator;
+use crate::expr::filter::{Filter, SchemableFilter};
+use crate::statistics::{ColumnStatistics, StatisticsContainer};
+
+use arrow_array::{ArrayRef, Datum};
+use arrow_ord::cmp;
+use arrow_schema::Schema;
+use std::collections::HashSet;
+
+/// A file-level pruner that filters files based on column statistics.
+///
+/// This pruner uses min/max statistics from Parquet files to determine if a 
file
+/// can be skipped (pruned) based on query predicates. A file is pruned if its
+/// statistics prove that no rows in the file can match the predicate.
+#[derive(Debug, Clone)]
+pub struct FilePruner {
+    /// Filters that apply to non-partition columns
+    and_filters: Vec<SchemableFilter>,
+}
+
+impl FilePruner {
+    /// Creates a new file pruner with filters on non-partition columns.
+    ///
+    /// Filters on partition columns are excluded since they are handled by 
PartitionPruner.
+    ///
+    /// # Arguments
+    /// * `and_filters` - List of filters to apply
+    /// * `table_schema` - The table's data schema
+    /// * `partition_schema` - The partition schema (filters on these columns 
are excluded)
+    pub fn new(
+        and_filters: &[Filter],
+        table_schema: &Schema,
+        partition_schema: &Schema,
+    ) -> Result<Self> {
+        // Get partition column names to exclude
+        let partition_columns: HashSet<&str> = partition_schema
+            .fields()
+            .iter()
+            .map(|f| f.name().as_str())
+            .collect();
+
+        // Only keep filters on non-partition columns that exist in the table 
schema
+        let and_filters: Vec<SchemableFilter> = and_filters
+            .iter()
+            .filter(|filter| 
!partition_columns.contains(filter.field_name.as_str()))
+            .filter_map(|filter| SchemableFilter::try_from((filter.clone(), 
table_schema)).ok())
+            .collect();
+
+        Ok(FilePruner { and_filters })
+    }
+
+    /// Creates an empty file pruner that does not filter any files.
+    pub fn empty() -> Self {
+        FilePruner {
+            and_filters: Vec::new(),
+        }
+    }
+
+    /// Returns `true` if the pruner does not have any filters.
+    pub fn is_empty(&self) -> bool {
+        self.and_filters.is_empty()
+    }
+
+    /// Returns `true` if the file should be included based on its statistics.
+    ///
+    /// A file is included if ANY of its rows MIGHT match all the filters.
+    /// A file is excluded (pruned) only if we can prove that NO rows can 
match.
+    ///
+    /// If statistics are missing or incomplete, the file is included (safe 
default).
+    pub fn should_include(&self, stats: &StatisticsContainer) -> bool {
+        // If no filters, include everything
+        if self.and_filters.is_empty() {
+            return true;
+        }
+
+        // All filters must pass (AND semantics)
+        // If any filter definitively excludes the file, return false
+        for filter in &self.and_filters {
+            let col_name = filter.field.name();
+
+            // Get column statistics. When using 
StatisticsContainer::from_parquet_metadata(),
+            // all schema columns will have an entry. However, stats may come 
from other sources
+            // (e.g., manually constructed), so we handle missing columns 
defensively.
+            let Some(col_stats) = stats.columns.get(col_name) else {
+                // No stats for this column, cannot prune - include the file
+                continue;
+            };
+
+            // Check if this filter can prune the file
+            if self.can_prune_by_filter(filter, col_stats) {
+                return false; // File can be pruned
+            }
+        }
+
+        true // File should be included
+    }
+
+    /// Determines if a file can be pruned based on a single filter and column 
stats.
+    ///
+    /// Returns `true` if the file can definitely be pruned (no rows can 
match).
+    fn can_prune_by_filter(&self, filter: &SchemableFilter, col_stats: 
&ColumnStatistics) -> bool {
+        // Get the filter value as an ArrayRef
+        let filter_array = self.extract_filter_array(filter);
+        let Some(filter_value) = filter_array else {
+            return false; // Cannot extract value, don't prune
+        };
+
+        let min = &col_stats.min_value;
+        let max = &col_stats.max_value;
+
+        match filter.operator {
+            ExprOperator::Eq => {
+                // Prune if: value < min OR value > max
+                self.can_prune_eq(&filter_value, min, max)
+            }
+            ExprOperator::Ne => {
+                // Prune if: min = max = value (all rows equal the excluded 
value)
+                self.can_prune_ne(&filter_value, min, max)
+            }
+            ExprOperator::Lt => {
+                // Prune if: min >= value (all values are >= the threshold)
+                self.can_prune_lt(&filter_value, min)
+            }
+            ExprOperator::Lte => {
+                // Prune if: min > value
+                self.can_prune_lte(&filter_value, min)
+            }
+            ExprOperator::Gt => {
+                // Prune if: max <= value (all values are <= the threshold)
+                self.can_prune_gt(&filter_value, max)
+            }
+            ExprOperator::Gte => {
+                // Prune if: max < value
+                self.can_prune_gte(&filter_value, max)
+            }
+        }
+    }
+
+    /// Prune for `col = value`: prune if value < min OR value > max
+    fn can_prune_eq(
+        &self,
+        value: &ArrayRef,
+        min: &Option<ArrayRef>,
+        max: &Option<ArrayRef>,
+    ) -> bool {
+        // Need both min and max to make this decision
+        let Some(min_val) = min else {
+            return false;
+        };
+        let Some(max_val) = max else {
+            return false;
+        };
+
+        // Prune if value < min OR value > max
+        let value_lt_min = cmp::lt(value, min_val).map(|r| 
r.value(0)).unwrap_or(false);
+        let value_gt_max = cmp::gt(value, max_val).map(|r| 
r.value(0)).unwrap_or(false);
+
+        value_lt_min || value_gt_max
+    }
+
+    /// Prune for `col != value`: prune if min = max = value
+    fn can_prune_ne(
+        &self,
+        value: &ArrayRef,
+        min: &Option<ArrayRef>,
+        max: &Option<ArrayRef>,
+    ) -> bool {
+        // Need both min and max to make this decision
+        let Some(min_val) = min else {
+            return false;
+        };
+        let Some(max_val) = max else {
+            return false;
+        };
+
+        // Prune only if min = max = value (all rows have the excluded value)
+        let min_eq_max = cmp::eq(min_val, max_val)
+            .map(|r| r.value(0))
+            .unwrap_or(false);
+        let min_eq_value = cmp::eq(min_val, value).map(|r| 
r.value(0)).unwrap_or(false);
+
+        min_eq_max && min_eq_value
+    }
+
+    /// Prune for `col < value`: prune if min >= value
+    fn can_prune_lt(&self, value: &ArrayRef, min: &Option<ArrayRef>) -> bool {
+        let Some(min_val) = min else {
+            return false;
+        };
+
+        // Prune if min >= value
+        cmp::gt_eq(min_val, value)
+            .map(|r| r.value(0))
+            .unwrap_or(false)
+    }
+
+    /// Prune for `col <= value`: prune if min > value
+    fn can_prune_lte(&self, value: &ArrayRef, min: &Option<ArrayRef>) -> bool {
+        let Some(min_val) = min else {
+            return false;
+        };
+
+        // Prune if min > value
+        cmp::gt(min_val, value).map(|r| r.value(0)).unwrap_or(false)
+    }
+
+    /// Prune for `col > value`: prune if max <= value
+    fn can_prune_gt(&self, value: &ArrayRef, max: &Option<ArrayRef>) -> bool {
+        let Some(max_val) = max else {
+            return false;
+        };
+
+        // Prune if max <= value
+        cmp::lt_eq(max_val, value)
+            .map(|r| r.value(0))
+            .unwrap_or(false)
+    }
+
+    /// Prune for `col >= value`: prune if max < value
+    fn can_prune_gte(&self, value: &ArrayRef, max: &Option<ArrayRef>) -> bool {
+        let Some(max_val) = max else {
+            return false;
+        };
+
+        // Prune if max < value
+        cmp::lt(max_val, value).map(|r| r.value(0)).unwrap_or(false)
+    }
+
+    /// Extracts the filter value as an ArrayRef.
+    fn extract_filter_array(&self, filter: &SchemableFilter) -> 
Option<ArrayRef> {
+        let (array, is_scalar) = filter.value.get();
+        if array.is_empty() {
+            return None;
+        }
+        // Only use scalar values or single-element arrays for min/max pruning.
+        // Multi-element arrays (e.g., IN lists) cannot be used for simple 
range pruning.
+        if is_scalar || array.len() == 1 {
+            Some(array.slice(0, 1))
+        } else {
+            None
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_array::{Int64Array, StringArray};
+    use arrow_schema::{DataType, Field};
+    use std::sync::Arc;
+
+    fn create_test_schema() -> Schema {
+        Schema::new(vec![
+            Field::new("id", DataType::Int64, false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new("value", DataType::Float64, true),
+            Field::new("date", DataType::Date32, false),
+        ])
+    }
+
+    fn create_partition_schema() -> Schema {
+        Schema::new(vec![Field::new("date", DataType::Date32, false)])
+    }
+
+    fn create_stats_with_int_range(col_name: &str, min: i64, max: i64) -> 
StatisticsContainer {
+        let mut stats = 
StatisticsContainer::new(crate::statistics::StatsGranularity::File);
+        stats.columns.insert(
+            col_name.to_string(),
+            ColumnStatistics {
+                column_name: col_name.to_string(),
+                data_type: DataType::Int64,
+                min_value: Some(Arc::new(Int64Array::from(vec![min])) as 
ArrayRef),
+                max_value: Some(Arc::new(Int64Array::from(vec![max])) as 
ArrayRef),
+            },
+        );
+        stats
+    }
+
+    fn create_stats_with_string_range(col_name: &str, min: &str, max: &str) -> 
StatisticsContainer {
+        let mut stats = 
StatisticsContainer::new(crate::statistics::StatsGranularity::File);
+        stats.columns.insert(
+            col_name.to_string(),
+            ColumnStatistics {
+                column_name: col_name.to_string(),
+                data_type: DataType::Utf8,
+                min_value: Some(Arc::new(StringArray::from(vec![min])) as 
ArrayRef),
+                max_value: Some(Arc::new(StringArray::from(vec![max])) as 
ArrayRef),
+            },
+        );
+        stats
+    }
+
+    #[test]
+    fn test_empty_pruner() {
+        let pruner = FilePruner::empty();
+        assert!(pruner.is_empty());
+
+        let stats = create_stats_with_int_range("id", 1, 100);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_pruner_excludes_partition_columns() {
+        let table_schema = create_test_schema();
+        let partition_schema = create_partition_schema();
+
+        // Filter on partition column should be excluded
+        let filters = vec![Filter::try_from(("date", "=", 
"2024-01-01")).unwrap()];
+
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+        assert!(pruner.is_empty()); // Partition column filter should be 
excluded
+    }
+
+    #[test]
+    fn test_pruner_keeps_non_partition_columns() {
+        let table_schema = create_test_schema();
+        let partition_schema = create_partition_schema();
+
+        // Filter on non-partition column should be kept
+        let filters = vec![Filter::try_from(("id", ">", "50")).unwrap()];
+
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+        assert!(!pruner.is_empty());
+    }
+
+    #[test]
+    fn test_eq_filter_prunes_when_value_below_min() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "=", "5")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id = 5. Should prune (5 < 10).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(!pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_eq_filter_prunes_when_value_above_max() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "=", "200")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id = 200. Should prune (200 > 100).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(!pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_eq_filter_includes_when_value_in_range() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id = 50. Should include (10 <= 50 
<= 100).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_ne_filter_prunes_when_all_equal() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "!=", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=50, max=50. Filter: id != 50. Should prune (all values 
are 50).
+        let stats = create_stats_with_int_range("id", 50, 50);
+        assert!(!pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_ne_filter_includes_when_range_exists() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "!=", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id != 50. Should include (has other 
values).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_lt_filter_prunes_when_min_gte_value() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "<", "10")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id < 10. Should prune (min >= 10).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(!pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_lt_filter_includes_when_min_lt_value() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "<", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id < 50. Should include (some 
values < 50).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_lte_filter_prunes_when_min_gt_value() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "<=", "5")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id <= 5. Should prune (min > 5).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(!pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_gt_filter_prunes_when_max_lte_value() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", ">", "100")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id > 100. Should prune (max <= 100).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(!pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_gt_filter_includes_when_max_gt_value() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", ">", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id > 50. Should include (some 
values > 50).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_gte_filter_prunes_when_max_lt_value() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", ">=", "150")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id >= 150. Should prune (max < 150).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(!pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_lte_filter_includes_when_value_in_range() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "<=", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id <= 50. Should include (some 
values <= 50).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_gte_filter_includes_when_value_in_range() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", ">=", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id >= 50. Should include (some 
values >= 50).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_string_filter() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("name", "=", "zebra")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min="apple", max="banana". Filter: name = "zebra". Should 
prune.
+        let stats = create_stats_with_string_range("name", "apple", "banana");
+        assert!(!pruner.should_include(&stats));
+
+        // Stats: min="apple", max="zebra". Filter: name = "banana". Should 
include.
+        let stats2 = create_stats_with_string_range("name", "apple", "zebra");
+        assert!(pruner.should_include(&stats2));
+    }
+
+    #[test]
+    fn test_missing_column_stats_includes_file() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats for different column - should include (cannot prune without 
stats)
+        let stats = create_stats_with_int_range("other_column", 1, 10);
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_filter_on_column_with_no_stats() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Column exists but has no min/max (e.g., Parquet file with 
statistics disabled)
+        let mut stats = 
StatisticsContainer::new(crate::statistics::StatsGranularity::File);
+        stats.columns.insert(
+            "id".to_string(),
+            ColumnStatistics {
+                column_name: "id".to_string(),
+                data_type: DataType::Int64,
+                min_value: None,
+                max_value: None,
+            },
+        );
+
+        // Should include file (cannot prune without min/max values)
+        assert!(pruner.should_include(&stats));
+    }
+
+    #[test]
+    fn test_multiple_filters_all_must_pass() {
+        let table_schema = create_test_schema();
+        let partition_schema = Schema::empty();
+
+        let filters = vec![
+            Filter::try_from(("id", ">", "0")).unwrap(),
+            Filter::try_from(("id", "<", "5")).unwrap(),
+        ];
+        let pruner = FilePruner::new(&filters, &table_schema, 
&partition_schema).unwrap();
+
+        // Stats: min=10, max=100. Filter: id > 0 AND id < 5.
+        // First filter passes (max > 0), second filter prunes (min >= 5).
+        let stats = create_stats_with_int_range("id", 10, 100);
+        assert!(!pruner.should_include(&stats));
+    }
+}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index e6581a3..223a86f 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -20,17 +20,20 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
+use arrow_schema::Schema;
+
 use crate::Result;
 use crate::config::HudiConfigs;
 use crate::config::table::HudiTableConfig::BaseFileFormat;
 use crate::file_group::FileGroup;
 use crate::file_group::builder::file_groups_from_files_partition_records;
 use crate::file_group::file_slice::FileSlice;
+use crate::metadata::table::records::FilesPartitionRecord;
 use crate::storage::Storage;
 use crate::table::Table;
+use crate::table::file_pruner::FilePruner;
 use crate::table::listing::FileLister;
 use crate::table::partition::PartitionPruner;
-use crate::timeline::completion_time::CompletionTimeView;
 use crate::timeline::view::TimelineView;
 use dashmap::DashMap;
 
@@ -58,64 +61,152 @@ impl FileSystemView {
         })
     }
 
-    /// Load file groups by listing from the storage.
+    /// Load file groups from the appropriate source (storage or metadata 
table records)
+    /// and apply stats-based pruning.
+    ///
+    /// # File Listing Source
+    /// - If `files_partition_records` is Some: Uses pre-fetched metadata 
table records
+    /// - If `files_partition_records` is None: Uses storage listing via 
FileLister
+    ///
+    /// # Stats Pruning Source (for non-empty file_pruner)
+    /// - Currently: Always extracts stats from Parquet file footers
+    /// - TODO: Use metadata table partitions when available:
+    ///   - partition_stats: Enhance PartitionPruner to prune partitions 
before file listing
+    ///   - column_stats: Prune files without reading Parquet footers
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
+    /// * `file_pruner` - Filters files based on column statistics
+    /// * `table_schema` - Table schema for statistics extraction
+    /// * `timeline_view` - The timeline view providing query timestamp and 
completion time lookups
+    /// * `files_partition_records` - Optional pre-fetched metadata table 
records
+    async fn load_file_groups(
         &self,
         partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        timeline_view: &TimelineView,
+        files_partition_records: Option<&HashMap<String, 
FilesPartitionRecord>>,
     ) -> Result<()> {
-        let lister = FileLister::new(
-            self.hudi_configs.clone(),
-            self.storage.clone(),
-            partition_pruner.to_owned(),
-        );
-        let file_groups_map = lister
-            .list_file_groups_for_relevant_partitions(completion_time_view)
-            .await?;
+        // TODO: Enhance PartitionPruner with partition_stats support
+        // - Load partition_stats from metadata table into PartitionPruner
+        // - PartitionPruner.should_include() will use both partition column 
values AND partition_stats
+        // - For non-partitioned tables: check 
partition_pruner.can_any_partition_match() for early return
+
+        // Step 1: Get file groups from appropriate source
+        let file_groups_map = if let Some(records) = files_partition_records {
+            // Use pre-fetched metadata table records
+            let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
+            file_groups_from_files_partition_records(records, 
&base_file_format, timeline_view)?
+        } else {
+            // Use storage listing
+            let lister = FileLister::new(
+                self.hudi_configs.clone(),
+                self.storage.clone(),
+                partition_pruner.to_owned(),
+            );
+            lister
+                .list_file_groups_for_relevant_partitions(timeline_view)
+                .await?
+        };
+
+        // Step 2: Apply partition pruning (for metadata table path) and stats 
pruning
+        // Note: Storage listing path already applies partition pruning via 
FileLister
+        // TODO: Check if metadata table column_stats partition is available
+        // and use that instead of Parquet footers for better performance
         for (partition_path, file_groups) in file_groups_map {
+            // Skip partitions that don't match the pruner (for metadata table 
path)
+            if files_partition_records.is_some()
+                && !partition_pruner.is_empty()
+                && !partition_pruner.should_include(&partition_path)
+            {
+                continue;
+            }
+
+            let retained = self
+                .apply_stats_pruning_from_footers(
+                    file_groups,
+                    file_pruner,
+                    table_schema,
+                    timeline_view.as_of_timestamp(),
+                )
+                .await;
             self.partition_to_file_groups
-                .insert(partition_path, file_groups);
+                .insert(partition_path, retained);
         }
+
         Ok(())
     }
 
-    /// Load file groups from metadata table records.
-    ///
-    /// This is an alternative to `load_file_groups_from_file_system` that uses
-    /// file listing records fetched from the metadata table. Only partitions 
that
-    /// pass the partition pruner will be loaded.
+    /// Apply file-level stats pruning using Parquet file footers.
     ///
-    /// This method is not async because it operates on pre-fetched records.
-    ///
-    /// # Arguments
-    /// * `records` - Metadata table files partition records
-    /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps
-    fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
+    /// Returns the filtered list of file groups that pass the pruning check.
+    /// Files are included (not pruned) if:
+    /// - The pruner has no filters
+    /// - The file is not a Parquet file
+    /// - Column stats cannot be loaded (conservative behavior)
+    /// - The file's stats indicate it might contain matching rows
+    async fn apply_stats_pruning_from_footers(
         &self,
-        records: &HashMap<String, 
crate::metadata::table_record::FilesPartitionRecord>,
-        partition_pruner: &PartitionPruner,
-        completion_time_view: &V,
-    ) -> Result<()> {
-        let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
-        let file_groups_map = file_groups_from_files_partition_records(
-            records,
-            &base_file_format,
-            completion_time_view,
-        )?;
-
-        for entry in file_groups_map.iter() {
-            let partition_path = entry.key();
-            if partition_pruner.is_empty() || 
partition_pruner.should_include(partition_path) {
-                self.partition_to_file_groups
-                    .insert(partition_path.clone(), entry.value().clone());
+        file_groups: Vec<FileGroup>,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
+        as_of_timestamp: &str,
+    ) -> Vec<FileGroup> {
+        if file_pruner.is_empty() {
+            return file_groups;
+        }
+
+        let mut retained = Vec::with_capacity(file_groups.len());
+
+        for mut fg in file_groups {
+            if let Some(fsl) = fg.get_file_slice_mut_as_of(as_of_timestamp) {
+                let relative_path = match fsl.base_file_relative_path() {
+                    Ok(path) => path,
+                    Err(e) => {
+                        log::warn!(
+                            "Cannot get base file path for pruning: {e}. 
Including file group."
+                        );
+                        retained.push(fg);
+                        continue;
+                    }
+                };
+
+                // Case-insensitive check for .parquet extension
+                if !relative_path.to_lowercase().ends_with(".parquet") {
+                    retained.push(fg);
+                    continue;
+                }
+
+                // Load column stats from Parquet footer
+                let stats = match self
+                    .storage
+                    .get_parquet_column_stats(&relative_path, table_schema)
+                    .await
+                {
+                    Ok(s) => s,
+                    Err(e) => {
+                        log::warn!(
+                            "Failed to load column stats for {relative_path}: 
{e}. Including file."
+                        );
+                        retained.push(fg);
+                        continue;
+                    }
+                };
+
+                if file_pruner.should_include(&stats) {
+                    retained.push(fg);
+                } else {
+                    log::debug!("Pruned file {relative_path} based on column 
stats");
+                }
+            } else {
+                // No file slice as of timestamp, include the file group
+                // (it will be filtered out later in collect_file_slices)
+                retained.push(fg);
             }
         }
-        Ok(())
+
+        retained
     }
 
     /// Collect file slices from loaded file groups using the timeline view.
@@ -159,27 +250,34 @@ impl FileSystemView {
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
+    /// * `file_pruner` - Filters files based on column statistics
+    /// * `table_schema` - Table schema for statistics extraction
     /// * `timeline_view` - The timeline view containing query context
     /// * `metadata_table` - Optional metadata table instance
     pub(crate) async fn get_file_slices(
         &self,
         partition_pruner: &PartitionPruner,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
         timeline_view: &TimelineView,
         metadata_table: Option<&Table>,
     ) -> Result<Vec<FileSlice>> {
-        if let Some(mdt) = metadata_table {
-            // Use metadata table for file listing
-            let records = 
mdt.fetch_files_partition_records(partition_pruner).await?;
-            self.load_file_groups_from_metadata_table_records(
-                &records,
-                partition_pruner,
-                timeline_view,
-            )?;
+        // Fetch records from metadata table if available
+        let files_partition_records = if let Some(mdt) = metadata_table {
+            Some(mdt.fetch_files_partition_records(partition_pruner).await?)
         } else {
-            // Fall back to storage listing
-            self.load_file_groups_from_storage(partition_pruner, timeline_view)
-                .await?;
-        }
+            None
+        };
+
+        self.load_file_groups(
+            partition_pruner,
+            file_pruner,
+            table_schema,
+            timeline_view,
+            files_partition_records.as_ref(),
+        )
+        .await?;
+
         self.collect_file_slices(partition_pruner, timeline_view)
             .await
     }
@@ -191,14 +289,26 @@ impl FileSystemView {
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
+    /// * `file_pruner` - Filters files based on column statistics
+    /// * `table_schema` - Table schema for statistics extraction
     /// * `timeline_view` - The timeline view containing query context
     pub(crate) async fn get_file_slices_by_storage_listing(
         &self,
         partition_pruner: &PartitionPruner,
+        file_pruner: &FilePruner,
+        table_schema: &Schema,
         timeline_view: &TimelineView,
     ) -> Result<Vec<FileSlice>> {
-        self.load_file_groups_from_storage(partition_pruner, timeline_view)
-            .await?;
+        // Pass None to force storage listing (avoids recursion for metadata 
table)
+        self.load_file_groups(
+            partition_pruner,
+            file_pruner,
+            table_schema,
+            timeline_view,
+            None,
+        )
+        .await?;
+
         self.collect_file_slices(partition_pruner, timeline_view)
             .await
     }
@@ -227,9 +337,17 @@ mod tests {
             .await
             .unwrap();
         let partition_pruner = PartitionPruner::empty();
+        let file_pruner = FilePruner::empty();
+        let table_schema = hudi_table.get_schema().await.unwrap();
 
         let file_slices = fs_view
-            .get_file_slices(&partition_pruner, &timeline_view, None)
+            .get_file_slices(
+                &partition_pruner,
+                &file_pruner,
+                &table_schema,
+                &timeline_view,
+                None,
+            )
             .await
             .unwrap();
 
@@ -260,9 +378,17 @@ mod tests {
             .await
             .unwrap();
         let partition_pruner = PartitionPruner::empty();
+        let file_pruner = FilePruner::empty();
+        let table_schema = hudi_table.get_schema().await.unwrap();
 
         let file_slices = fs_view
-            .get_file_slices(&partition_pruner, &timeline_view, None)
+            .get_file_slices(
+                &partition_pruner,
+                &file_pruner,
+                &table_schema,
+                &timeline_view,
+                None,
+            )
             .await
             .unwrap();
 
@@ -293,6 +419,7 @@ mod tests {
             .await
             .unwrap();
         let partition_schema = 
hudi_table.get_partition_schema().await.unwrap();
+        let table_schema = hudi_table.get_schema().await.unwrap();
 
         let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
         let filter_eq_300 = Filter::try_from(("shortField", "=", 
"300")).unwrap();
@@ -303,8 +430,16 @@ mod tests {
         )
         .unwrap();
 
+        let file_pruner = FilePruner::empty();
+
         let file_slices = fs_view
-            .get_file_slices(&partition_pruner, &timeline_view, None)
+            .get_file_slices(
+                &partition_pruner,
+                &file_pruner,
+                &table_schema,
+                &timeline_view,
+                None,
+            )
             .await
             .unwrap();
 
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index f02fd3e..153105e 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -88,6 +88,7 @@
 //! ```
 
 pub mod builder;
+pub mod file_pruner;
 mod fs_view;
 mod listing;
 pub mod partition;
@@ -107,6 +108,7 @@ use crate::file_group::reader::FileGroupReader;
 use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
 use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
 use crate::table::builder::TableBuilder;
+use crate::table::file_pruner::FilePruner;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
 use crate::timeline::util::format_timestamp;
@@ -481,6 +483,12 @@ impl Table {
         let partition_pruner =
             PartitionPruner::new(filters, &partition_schema, 
self.hudi_configs.as_ref())?;
 
+        // Get table schema for file pruning
+        let table_schema = self.get_schema().await?;
+
+        // Create file pruner with filters on non-partition columns
+        let file_pruner = FilePruner::new(filters, &table_schema, 
&partition_schema)?;
+
         // Try to create metadata table instance if enabled
         let metadata_table = if self.is_metadata_table_enabled() {
             log::debug!("Using metadata table for file listing");
@@ -498,7 +506,13 @@ impl Table {
         };
 
         self.file_system_view
-            .get_file_slices(&partition_pruner, &timeline_view, 
metadata_table.as_ref())
+            .get_file_slices(
+                &partition_pruner,
+                &file_pruner,
+                &table_schema,
+                &timeline_view,
+                metadata_table.as_ref(),
+            )
             .await
     }
 
diff --git a/crates/core/tests/statistics_tests.rs 
b/crates/core/tests/statistics_tests.rs
new file mode 100644
index 0000000..0186b2a
--- /dev/null
+++ b/crates/core/tests/statistics_tests.rs
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Integration tests for the statistics module.
+//!
+//! These tests generate Parquet files with known data, then verify that
+//! the statistics extraction correctly reads min/max values.
+
+use std::fs::File;
+use std::sync::Arc;
+
+use arrow_array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int8Array, Int16Array,
+    Int32Array, Int64Array, RecordBatch, StringArray, 
TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, 
UInt32Array,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::FileReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use tempfile::tempdir;
+
+use hudi_core::statistics::{StatisticsContainer, StatsGranularity};
+
+/// Helper to write a RecordBatch to a Parquet file and return the path.
+fn write_parquet_file(batch: &RecordBatch, path: &std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props)).unwrap();
+    writer.write(batch).unwrap();
+    writer.close().unwrap();
+}
+
+/// Helper to write a RecordBatch to a Parquet file without statistics.
+fn write_parquet_file_no_stats(batch: &RecordBatch, path: &std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props)).unwrap();
+    writer.write(batch).unwrap();
+    writer.close().unwrap();
+}
+
+/// Helper to write multiple RecordBatches to a single Parquet file (multiple 
row groups).
+fn write_parquet_file_multiple_row_groups(batches: &[RecordBatch], path: 
&std::path::Path) {
+    let file = File::create(path).unwrap();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+        .set_max_row_group_size(3) // Force smaller row groups
+        .build();
+    let mut writer = ArrowWriter::try_new(file, batches[0].schema(), 
Some(props)).unwrap();
+    for batch in batches {
+        writer.write(batch).unwrap();
+    }
+    writer.close().unwrap();
+}
+
+/// Helper to read Parquet metadata from a file.
+fn read_parquet_metadata(path: &std::path::Path) -> 
parquet::file::metadata::ParquetMetaData {
+    let file = File::open(path).unwrap();
+    let reader = SerializedFileReader::new(file).unwrap();
+    reader.metadata().clone()
+}
+
+/// Helper to extract Int32 value from ArrayRef
+fn get_int32(arr: &ArrayRef) -> i32 {
+    arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int64 value from ArrayRef
+fn get_int64(arr: &ArrayRef) -> i64 {
+    arr.as_any().downcast_ref::<Int64Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int8 value from ArrayRef
+fn get_int8(arr: &ArrayRef) -> i8 {
+    arr.as_any().downcast_ref::<Int8Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int16 value from ArrayRef
+fn get_int16(arr: &ArrayRef) -> i16 {
+    arr.as_any().downcast_ref::<Int16Array>().unwrap().value(0)
+}
+
+/// Helper to extract UInt32 value from ArrayRef
+fn get_uint32(arr: &ArrayRef) -> u32 {
+    arr.as_any().downcast_ref::<UInt32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Float32 value from ArrayRef
+fn get_float32(arr: &ArrayRef) -> f32 {
+    arr.as_any()
+        .downcast_ref::<Float32Array>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract Float64 value from ArrayRef
+fn get_float64(arr: &ArrayRef) -> f64 {
+    arr.as_any()
+        .downcast_ref::<Float64Array>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract Boolean value from ArrayRef
+fn get_bool(arr: &ArrayRef) -> bool {
+    arr.as_any()
+        .downcast_ref::<BooleanArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract String value from ArrayRef
+fn get_string(arr: &ArrayRef) -> String {
+    arr.as_any()
+        .downcast_ref::<StringArray>()
+        .unwrap()
+        .value(0)
+        .to_string()
+}
+
+/// Helper to extract Date32 value from ArrayRef
+fn get_date32(arr: &ArrayRef) -> i32 {
+    arr.as_any().downcast_ref::<Date32Array>().unwrap().value(0)
+}
+
+/// Helper to extract TimestampMicrosecond value from ArrayRef
+fn get_timestamp_micros(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampMicrosecondArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract TimestampSecond value from ArrayRef
+fn get_timestamp_seconds(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampSecondArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract TimestampMillisecond value from ArrayRef
+fn get_timestamp_millis(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampMillisecondArray>()
+        .unwrap()
+        .value(0)
+}
+
+/// Helper to extract TimestampNanosecond value from ArrayRef
+fn get_timestamp_nanos(arr: &ArrayRef) -> i64 {
+    arr.as_any()
+        .downcast_ref::<TimestampNanosecondArray>()
+        .unwrap()
+        .value(0)
+}
+
+mod file_level_statistics {
+    use super::*;
+
+    #[test]
+    fn test_integer_column_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("integer_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("int32_col", DataType::Int32, false),
+            Field::new("int64_col", DataType::Int64, false),
+            Field::new("uint32_col", DataType::UInt32, false),
+        ]));
+        let int32_values = Int32Array::from(vec![10, 50, 30, 20, 40]);
+        let int64_values = Int64Array::from(vec![100_i64, 500, 300, 200, 400]);
+        let uint32_values = UInt32Array::from(vec![100_u32, 500, 300, 200, 
400]);
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(int32_values) as ArrayRef,
+                Arc::new(int64_values) as ArrayRef,
+                Arc::new(uint32_values) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        assert_eq!(stats.granularity, StatsGranularity::File);
+        assert_eq!(stats.num_rows, Some(5));
+
+        let int32_stats = stats.columns.get("int32_col").unwrap();
+        assert_eq!(get_int32(int32_stats.min_value.as_ref().unwrap()), 10);
+        assert_eq!(get_int32(int32_stats.max_value.as_ref().unwrap()), 50);
+
+        let int64_stats = stats.columns.get("int64_col").unwrap();
+        assert_eq!(get_int64(int64_stats.min_value.as_ref().unwrap()), 100);
+        assert_eq!(get_int64(int64_stats.max_value.as_ref().unwrap()), 500);
+
+        let uint32_stats = stats.columns.get("uint32_col").unwrap();
+        assert_eq!(get_uint32(uint32_stats.min_value.as_ref().unwrap()), 100);
+        assert_eq!(get_uint32(uint32_stats.max_value.as_ref().unwrap()), 500);
+    }
+
+    #[test]
+    fn test_int8_and_int16_column_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("small_int_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("int8_col", DataType::Int8, false),
+            Field::new("int16_col", DataType::Int16, false),
+        ]));
+        let int8_values = Int8Array::from(vec![-10_i8, 50, 30, -20, 40]);
+        let int16_values = Int16Array::from(vec![100_i16, 500, -300, 200, 
400]);
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(int8_values) as ArrayRef,
+                Arc::new(int16_values) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let int8_stats = stats.columns.get("int8_col").unwrap();
+        assert_eq!(get_int8(int8_stats.min_value.as_ref().unwrap()), -20);
+        assert_eq!(get_int8(int8_stats.max_value.as_ref().unwrap()), 50);
+
+        let int16_stats = stats.columns.get("int16_col").unwrap();
+        assert_eq!(get_int16(int16_stats.min_value.as_ref().unwrap()), -300);
+        assert_eq!(get_int16(int16_stats.max_value.as_ref().unwrap()), 500);
+    }
+
+    #[test]
+    fn test_float_column_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("float_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("float32_col", DataType::Float32, false),
+            Field::new("float64_col", DataType::Float64, false),
+        ]));
+        let float32_values = Float32Array::from(vec![1.5_f32, 3.5, 2.5, 0.5, 
4.5]);
+        let float64_values = Float64Array::from(vec![10.5_f64, 30.5, 20.5, 
5.5, 40.5]);
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(float32_values) as ArrayRef,
+                Arc::new(float64_values) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let float32_stats = stats.columns.get("float32_col").unwrap();
+        assert_eq!(get_float32(float32_stats.min_value.as_ref().unwrap()), 
0.5);
+        assert_eq!(get_float32(float32_stats.max_value.as_ref().unwrap()), 
4.5);
+
+        let float64_stats = stats.columns.get("float64_col").unwrap();
+        assert_eq!(get_float64(float64_stats.min_value.as_ref().unwrap()), 
5.5);
+        assert_eq!(get_float64(float64_stats.max_value.as_ref().unwrap()), 
40.5);
+    }
+
+    #[test]
+    fn test_string_column_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("string_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new("name", 
DataType::Utf8, false)]));
+        let names = StringArray::from(vec!["charlie", "alice", "bob", "diana", 
"eve"]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(names) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("name").unwrap();
+        assert_eq!(get_string(col_stats.min_value.as_ref().unwrap()), "alice");
+        assert_eq!(get_string(col_stats.max_value.as_ref().unwrap()), "eve");
+    }
+
+    #[test]
+    fn test_boolean_column_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("bool_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "is_active",
+            DataType::Boolean,
+            false,
+        )]));
+        let values = BooleanArray::from(vec![true, false, true, false, true]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("is_active").unwrap();
+        assert!(!get_bool(col_stats.min_value.as_ref().unwrap()));
+        assert!(get_bool(col_stats.max_value.as_ref().unwrap()));
+    }
+
+    #[test]
+    fn test_date32_column_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("date32_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "event_date",
+            DataType::Date32,
+            false,
+        )]));
+        // Days since epoch: 19000 = ~2022-01-01, 19100 = ~2022-04-11, etc.
+        let values = Date32Array::from(vec![19000, 19100, 19050, 18900, 
19200]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("event_date").unwrap();
+        assert_eq!(get_date32(col_stats.min_value.as_ref().unwrap()), 18900);
+        assert_eq!(get_date32(col_stats.max_value.as_ref().unwrap()), 19200);
+    }
+
+    #[test]
+    fn test_timestamp_column_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("timestamp_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "event_time",
+            DataType::Timestamp(TimeUnit::Microsecond, None),
+            false,
+        )]));
+        // Microseconds since epoch
+        let values = TimestampMicrosecondArray::from(vec![
+            1_000_000_i64,
+            5_000_000,
+            3_000_000,
+            2_000_000,
+            4_000_000,
+        ]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("event_time").unwrap();
+        assert_eq!(
+            get_timestamp_micros(col_stats.min_value.as_ref().unwrap()),
+            1_000_000
+        );
+        assert_eq!(
+            get_timestamp_micros(col_stats.max_value.as_ref().unwrap()),
+            5_000_000
+        );
+    }
+
+    #[test]
+    fn test_timestamp_all_time_units() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = 
temp_dir.path().join("timestamp_units_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ts_sec", DataType::Timestamp(TimeUnit::Second, None), 
false),
+            Field::new(
+                "ts_milli",
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                false,
+            ),
+            Field::new(
+                "ts_micro",
+                DataType::Timestamp(TimeUnit::Microsecond, None),
+                false,
+            ),
+            Field::new(
+                "ts_nano",
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                false,
+            ),
+        ]));
+
+        let ts_sec = TimestampSecondArray::from(vec![100_i64, 500, 300, 200, 
400]);
+        let ts_milli = TimestampMillisecondArray::from(vec![1000_i64, 5000, 
3000, 2000, 4000]);
+        let ts_micro = TimestampMicrosecondArray::from(vec![10000_i64, 50000, 
30000, 20000, 40000]);
+        let ts_nano =
+            TimestampNanosecondArray::from(vec![100000_i64, 500000, 300000, 
200000, 400000]);
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(ts_sec) as ArrayRef,
+                Arc::new(ts_milli) as ArrayRef,
+                Arc::new(ts_micro) as ArrayRef,
+                Arc::new(ts_nano) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        // Verify seconds
+        let col_stats = stats.columns.get("ts_sec").unwrap();
+        assert_eq!(
+            get_timestamp_seconds(col_stats.min_value.as_ref().unwrap()),
+            100
+        );
+        assert_eq!(
+            get_timestamp_seconds(col_stats.max_value.as_ref().unwrap()),
+            500
+        );
+
+        // Verify milliseconds
+        let col_stats = stats.columns.get("ts_milli").unwrap();
+        assert_eq!(
+            get_timestamp_millis(col_stats.min_value.as_ref().unwrap()),
+            1000
+        );
+        assert_eq!(
+            get_timestamp_millis(col_stats.max_value.as_ref().unwrap()),
+            5000
+        );
+
+        // Verify microseconds
+        let col_stats = stats.columns.get("ts_micro").unwrap();
+        assert_eq!(
+            get_timestamp_micros(col_stats.min_value.as_ref().unwrap()),
+            10000
+        );
+        assert_eq!(
+            get_timestamp_micros(col_stats.max_value.as_ref().unwrap()),
+            50000
+        );
+
+        // Verify nanoseconds
+        let col_stats = stats.columns.get("ts_nano").unwrap();
+        assert_eq!(
+            get_timestamp_nanos(col_stats.min_value.as_ref().unwrap()),
+            100000
+        );
+        assert_eq!(
+            get_timestamp_nanos(col_stats.max_value.as_ref().unwrap()),
+            500000
+        );
+    }
+}
+
+mod null_handling {
+    use super::*;
+
+    #[test]
+    fn test_nullable_columns_with_nulls() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("nullable_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("int_value", DataType::Int32, true),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+        let int_values = Int32Array::from(vec![Some(10), None, Some(30), None, 
Some(20)]);
+        let names = StringArray::from(vec![
+            Some("alice"),
+            None,
+            Some("charlie"),
+            None,
+            Some("bob"),
+        ]);
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(int_values) as ArrayRef,
+                Arc::new(names) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        // Verify Int32 column stats ignore nulls
+        let int_stats = stats.columns.get("int_value").unwrap();
+        assert_eq!(get_int32(int_stats.min_value.as_ref().unwrap()), 10);
+        assert_eq!(get_int32(int_stats.max_value.as_ref().unwrap()), 30);
+
+        // Verify String column stats ignore nulls
+        let name_stats = stats.columns.get("name").unwrap();
+        assert_eq!(get_string(name_stats.min_value.as_ref().unwrap()), 
"alice");
+        assert_eq!(
+            get_string(name_stats.max_value.as_ref().unwrap()),
+            "charlie"
+        );
+    }
+
+    #[test]
+    fn test_all_nulls_column() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("all_nulls_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int32,
+            true,
+        )]));
+        let values: Int32Array = vec![None, None, None].into_iter().collect();
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("value").unwrap();
+        // For all nulls, min/max should be None
+        assert!(col_stats.min_value.is_none());
+        assert!(col_stats.max_value.is_none());
+    }
+}
+
+mod row_group_statistics {
+    use super::*;
+
+    #[test]
+    fn test_single_row_group_stats() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("single_rg_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("value", DataType::Float64, false),
+        ]));
+        let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let values = Float64Array::from(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(ids) as ArrayRef, Arc::new(values) as ArrayRef],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let row_group = &metadata.row_groups()[0];
+        let rg_stats = StatisticsContainer::from_row_group(row_group, &schema);
+
+        assert_eq!(rg_stats.granularity, StatsGranularity::RowGroup);
+        assert_eq!(rg_stats.num_rows, Some(5));
+
+        let id_stats = rg_stats.columns.get("id").unwrap();
+        assert_eq!(get_int32(id_stats.min_value.as_ref().unwrap()), 1);
+        assert_eq!(get_int32(id_stats.max_value.as_ref().unwrap()), 5);
+
+        let value_stats = rg_stats.columns.get("value").unwrap();
+        assert_eq!(get_float64(value_stats.min_value.as_ref().unwrap()), 10.0);
+        assert_eq!(get_float64(value_stats.max_value.as_ref().unwrap()), 50.0);
+    }
+
+    #[test]
+    fn test_multiple_row_groups_aggregation() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("multi_rg_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int32,
+            true,
+        )]));
+
+        // Create batches that will span multiple row groups
+        // Batch 1: values 100-110
+        let batch1 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![100, 105, 110])) as ArrayRef],
+        )
+        .unwrap();
+
+        // Batch 2: values 1-10 (includes global min)
+        let batch2 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![1, 5, 10])) as ArrayRef],
+        )
+        .unwrap();
+
+        // Batch 3: values 200-220 (includes global max)
+        let batch3 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![200, 210, 220])) as ArrayRef],
+        )
+        .unwrap();
+
+        write_parquet_file_multiple_row_groups(&[batch1, batch2, batch3], 
&parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+
+        // Verify we have multiple row groups
+        assert!(
+            metadata.num_row_groups() >= 2,
+            "Expected multiple row groups"
+        );
+
+        // Get file-level stats (aggregated from row groups)
+        let file_stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        assert_eq!(file_stats.granularity, StatsGranularity::File);
+        assert_eq!(file_stats.num_rows, Some(9)); // 3 + 3 + 3
+
+        let col_stats = file_stats.columns.get("value").unwrap();
+        // File-level min should be global min across all row groups
+        assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 1);
+        // File-level max should be global max across all row groups
+        assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 220);
+    }
+
+    #[test]
+    fn test_row_group_with_nulls_aggregation() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("rg_nulls_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int32,
+            true,
+        )]));
+
+        // Batch with some nulls
+        let batch1 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![Some(10), None, Some(20)])) as 
ArrayRef],
+        )
+        .unwrap();
+
+        // Batch with more nulls
+        let batch2 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![None, Some(30), None])) as 
ArrayRef],
+        )
+        .unwrap();
+
+        write_parquet_file_multiple_row_groups(&[batch1, batch2], 
&parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let file_stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = file_stats.columns.get("value").unwrap();
+        assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 10);
+        assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 30);
+    }
+}
+
+mod multiple_columns {
+    use super::*;
+
+    #[test]
+    fn test_mixed_type_columns() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("mixed_types_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+            Field::new("score", DataType::Float64, true),
+            Field::new("is_active", DataType::Boolean, false),
+        ]));
+
+        let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let names = StringArray::from(vec!["alice", "bob", "charlie", "diana", 
"eve"]);
+        let scores = Float64Array::from(vec![Some(85.5), Some(92.0), None, 
Some(78.5), Some(95.0)]);
+        let is_active = BooleanArray::from(vec![true, false, true, true, 
false]);
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(ids) as ArrayRef,
+                Arc::new(names) as ArrayRef,
+                Arc::new(scores) as ArrayRef,
+                Arc::new(is_active) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        // Verify all columns have statistics
+        assert_eq!(stats.columns.len(), 4);
+
+        // Check int32 column
+        let id_stats = stats.columns.get("id").unwrap();
+        assert_eq!(get_int32(id_stats.min_value.as_ref().unwrap()), 1);
+        assert_eq!(get_int32(id_stats.max_value.as_ref().unwrap()), 5);
+
+        // Check string column
+        let name_stats = stats.columns.get("name").unwrap();
+        assert_eq!(get_string(name_stats.min_value.as_ref().unwrap()), 
"alice");
+        assert_eq!(get_string(name_stats.max_value.as_ref().unwrap()), "eve");
+
+        // Check float64 column with null
+        let score_stats = stats.columns.get("score").unwrap();
+        assert_eq!(get_float64(score_stats.min_value.as_ref().unwrap()), 78.5);
+        assert_eq!(get_float64(score_stats.max_value.as_ref().unwrap()), 95.0);
+
+        // Check boolean column
+        let active_stats = stats.columns.get("is_active").unwrap();
+        assert!(!get_bool(active_stats.min_value.as_ref().unwrap()));
+        assert!(get_bool(active_stats.max_value.as_ref().unwrap()));
+    }
+
+    #[test]
+    fn test_schema_column_ordering() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("ordering_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("z_col", DataType::Int32, false),
+            Field::new("a_col", DataType::Int32, false),
+            Field::new("m_col", DataType::Int32, false),
+        ]));
+
+        let z_values = Int32Array::from(vec![1, 2, 3]);
+        let a_values = Int32Array::from(vec![10, 20, 30]);
+        let m_values = Int32Array::from(vec![100, 200, 300]);
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(z_values) as ArrayRef,
+                Arc::new(a_values) as ArrayRef,
+                Arc::new(m_values) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        // All columns should have stats regardless of their name ordering
+        assert!(stats.columns.contains_key("z_col"));
+        assert!(stats.columns.contains_key("a_col"));
+        assert!(stats.columns.contains_key("m_col"));
+
+        // Verify values
+        let z_stats = stats.columns.get("z_col").unwrap();
+        assert_eq!(get_int32(z_stats.min_value.as_ref().unwrap()), 1);
+        assert_eq!(get_int32(z_stats.max_value.as_ref().unwrap()), 3);
+
+        let a_stats = stats.columns.get("a_col").unwrap();
+        assert_eq!(get_int32(a_stats.min_value.as_ref().unwrap()), 10);
+        assert_eq!(get_int32(a_stats.max_value.as_ref().unwrap()), 30);
+
+        let m_stats = stats.columns.get("m_col").unwrap();
+        assert_eq!(get_int32(m_stats.min_value.as_ref().unwrap()), 100);
+        assert_eq!(get_int32(m_stats.max_value.as_ref().unwrap()), 300);
+    }
+}
+
+mod edge_cases {
+    use super::*;
+
+    #[test]
+    fn test_min_equals_max() {
+        let temp_dir = tempdir().unwrap();
+
+        // Test single row case
+        let single_row_path = temp_dir.path().join("single_row_test.parquet");
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int32,
+            false,
+        )]));
+        let values = Int32Array::from(vec![42]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+        write_parquet_file(&batch, &single_row_path);
+
+        let metadata = read_parquet_metadata(&single_row_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+        assert_eq!(stats.num_rows, Some(1));
+        let col_stats = stats.columns.get("value").unwrap();
+        assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 42);
+        assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 42);
+
+        // Test identical values case
+        let identical_path = temp_dir.path().join("identical_test.parquet");
+        let values = Int32Array::from(vec![100, 100, 100, 100, 100]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+        write_parquet_file(&batch, &identical_path);
+
+        let metadata = read_parquet_metadata(&identical_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+        let col_stats = stats.columns.get("value").unwrap();
+        assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 100);
+        assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 100);
+    }
+
+    #[test]
+    fn test_negative_numbers() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("negative_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int32,
+            false,
+        )]));
+        let values = Int32Array::from(vec![-100, -50, 0, 50, 100]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("value").unwrap();
+        assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), -100);
+        assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 100);
+    }
+
+    #[test]
+    fn test_empty_string() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("empty_string_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new("name", 
DataType::Utf8, false)]));
+        let names = StringArray::from(vec!["", "apple", "banana"]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(names) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("name").unwrap();
+        // Empty string should be the minimum (lexicographically smallest)
+        assert_eq!(get_string(col_stats.min_value.as_ref().unwrap()), "");
+        assert_eq!(get_string(col_stats.max_value.as_ref().unwrap()), 
"banana");
+    }
+
+    #[test]
+    fn test_special_float_values() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("special_float_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Float64,
+            false,
+        )]));
+        // Test with various float values including very small and very large
+        let values = Float64Array::from(vec![
+            f64::MIN_POSITIVE,
+            1.0,
+            f64::MAX / 2.0,
+            -f64::MAX / 2.0,
+            0.0,
+        ]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+
+        write_parquet_file(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        let col_stats = stats.columns.get("value").unwrap();
+        assert_eq!(
+            get_float64(col_stats.min_value.as_ref().unwrap()),
+            -f64::MAX / 2.0
+        );
+        assert_eq!(
+            get_float64(col_stats.max_value.as_ref().unwrap()),
+            f64::MAX / 2.0
+        );
+    }
+
+    #[test]
+    fn test_parquet_without_statistics() {
+        let temp_dir = tempdir().unwrap();
+        let parquet_path = temp_dir.path().join("no_stats_test.parquet");
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int32,
+            false,
+        )]));
+        let values = Int32Array::from(vec![10, 50, 30, 20, 40]);
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as 
ArrayRef]).unwrap();
+
+        write_parquet_file_no_stats(&batch, &parquet_path);
+
+        let metadata = read_parquet_metadata(&parquet_path);
+        let stats = StatisticsContainer::from_parquet_metadata(&metadata, 
&schema);
+
+        // When statistics are disabled, min/max should be None
+        let col_stats = stats.columns.get("value").unwrap();
+        assert!(col_stats.min_value.is_none());
+        assert!(col_stats.max_value.is_none());
+    }
+}
diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml
index 1f129d3..114f013 100644
--- a/crates/test/Cargo.toml
+++ b/crates/test/Cargo.toml
@@ -38,5 +38,5 @@ strum_macros = { workspace = true }
 url = { workspace = true }
 
 # testing
-tempfile = "3.20.0"
-zip-extract = "0.3.0"
+tempfile = { workspace = true }
+zip-extract = { workspace = true }

Reply via email to