Copilot commented on code in PR #516:
URL: https://github.com/apache/hudi-rs/pull/516#discussion_r2678989481


##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
             crypto: Default::default(),
         };
         let table_schema = self.schema();
+
+        // Read column stats pruning level configuration
+        let stats_level: String = self
+            .table
+            .hudi_configs
+            .get_or_default(ColumnStatsPruningLevel)
+            .into();
+
         let mut parquet_source = ParquetSource::new(parquet_opts);
+
+        // Enable page index for page-level pruning
+        if stats_level == "page" {

Review Comment:
   The string comparison `stats_level == "page"` is fragile and could fail 
silently if the configuration value is set to "Page" or "PAGE". Since 
`StatsGranularity::from_str` already handles case-insensitive parsing, consider 
parsing the configuration value to `StatsGranularity` and comparing the enum 
values instead of raw strings. This would make the code more robust and 
maintainable.
   ```suggestion
           // Enable page index for page-level pruning (case-insensitive)
           if stats_level.eq_ignore_ascii_case("page") {
   ```



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    /// - `file_null_count = sum(row_group_null_counts)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files (typically 64-128MB chunks).
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+
+    /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+    ///
+    /// Finest granularity, highest memory.
+    /// Can skip individual pages (typically ~1MB chunks).
+    /// Most effective when data is sorted by filter columns.
+    /// Requires Parquet files to be written with page index enabled.
+    Page,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            "page" => Ok(Self::Page),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group, page"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+            Self::Page => write!(f, "page"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (as ScalarValue for compatibility with DataFusion)
+    pub min_value: Option<ScalarValue>,
+    /// Maximum value (as ScalarValue for compatibility with DataFusion)
+    pub max_value: Option<ScalarValue>,
+    /// Number of null values
+    pub null_count: Option<i64>,
+    /// Number of distinct values (if available)
+    pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+            null_count: None,
+            distinct_count: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow 
data type.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+        let null_count = stats.null_count_opt().map(|n| n as i64);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+            null_count,
+            distinct_count: None,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs, sums null counts.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (&self.min_value, &other.min_value) {
+            (Some(a), Some(b)) => scalar_min(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (&self.max_value, &other.max_value) {
+            (Some(a), Some(b)) => scalar_max(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Sum null counts
+        self.null_count = match (self.null_count, other.null_count) {
+            (Some(a), Some(b)) => Some(a + b),
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(b),
+            (None, None) => None,
+        };
+
+        // Distinct count cannot be accurately merged, so we set it to None
+        self.distinct_count = None;
+    }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Sum up total rows across all row groups
+        let total_rows: i64 = metadata.row_groups().iter().map(|rg| 
rg.num_rows()).sum();
+        container.num_rows = Some(total_rows);
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name.clone())
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            // Get column name from the column descriptor
+            let col_path = col_chunk.column_descr().path();
+            let col_name = col_path.parts().last().map(|s| 
s.as_str()).unwrap_or("");
+
+            // Skip if we don't have type info for this column
+            let Some(&data_type) = column_types.get(col_name) else {
+                continue;
+            };
+
+            // Extract statistics if available
+            if let Some(stats) = col_chunk.statistics() {
+                let col_stats =
+                    ColumnStatistics::from_parquet_statistics(col_name, 
data_type, stats);
+                container.columns.insert(col_name.to_string(), col_stats);
+            } else {
+                // No stats available, create empty entry
+                container.columns.insert(
+                    col_name.to_string(),
+                    ColumnStatistics::new(col_name.to_string(), 
data_type.clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Convert to DataFusion Statistics for query planning.

Review Comment:
   The `to_datafusion_statistics` method is missing documentation. As a public 
API method that performs an important conversion, it should have a doc comment 
explaining what it does, its parameters, return value, and any important notes 
about the conversion (e.g., that it maintains the same order as the input 
schema). This is especially important since the method is likely to be a key 
integration point with DataFusion.
   ```suggestion
       /// Convert this container into DataFusion [`Statistics`] for query 
planning.
       ///
       /// # Arguments
       ///
       /// * `schema` - The Arrow [`Schema`] describing the logical columns of 
the table.
       ///   The fields in this schema determine both which columns are looked 
up in
       ///   this statistics container and the order of the returned per-column
       ///   statistics.
       ///
       /// # Returns
       ///
       /// A [`datafusion_common::Statistics`] value where:
       ///
       /// * `num_rows` is populated from `self.num_rows` as an
       ///   [`Precision::Exact`] value when available, or [`Precision::Absent`]
       ///   when the row count is not known.
       /// * `total_byte_size` is currently always [`Precision::Absent`].
       /// * `column_statistics` contains one entry per field in `schema`, in 
the
       ///   same order as `schema.fields()`. Each entry is populated from the
       ///   corresponding [`ColumnStatistics`] in this container if present, or
       ///   left as the default (all [`Precision::Absent`]) if no statistics
       ///   are available for that column.
       ///
       /// # Notes
       ///
       /// * Column lookup is performed by field name; if a field in `schema`
       ///   has no matching entry in `self.columns`, the default
       ///   [`datafusion_common::ColumnStatistics`] is used.
       /// * Minimum and maximum values are converted to [`ScalarValue`] and
       ///   wrapped in [`Precision::Exact`] when available, otherwise they are
       ///   set to [`Precision::Absent`].
   ```



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    /// - `file_null_count = sum(row_group_null_counts)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files (typically 64-128MB chunks).
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+
+    /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+    ///
+    /// Finest granularity, highest memory.
+    /// Can skip individual pages (typically ~1MB chunks).
+    /// Most effective when data is sorted by filter columns.
+    /// Requires Parquet files to be written with page index enabled.
+    Page,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            "page" => Ok(Self::Page),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group, page"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+            Self::Page => write!(f, "page"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (as ScalarValue for compatibility with DataFusion)
+    pub min_value: Option<ScalarValue>,
+    /// Maximum value (as ScalarValue for compatibility with DataFusion)
+    pub max_value: Option<ScalarValue>,
+    /// Number of null values
+    pub null_count: Option<i64>,
+    /// Number of distinct values (if available)
+    pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+            null_count: None,
+            distinct_count: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow 
data type.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+        let null_count = stats.null_count_opt().map(|n| n as i64);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+            null_count,
+            distinct_count: None,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs, sums null counts.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (&self.min_value, &other.min_value) {
+            (Some(a), Some(b)) => scalar_min(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (&self.max_value, &other.max_value) {
+            (Some(a), Some(b)) => scalar_max(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Sum null counts
+        self.null_count = match (self.null_count, other.null_count) {
+            (Some(a), Some(b)) => Some(a + b),
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(b),
+            (None, None) => None,
+        };
+
+        // Distinct count cannot be accurately merged, so we set it to None
+        self.distinct_count = None;
+    }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Sum up total rows across all row groups
+        let total_rows: i64 = metadata.row_groups().iter().map(|rg| 
rg.num_rows()).sum();
+        container.num_rows = Some(total_rows);
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name.clone())
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            // Get column name from the column descriptor
+            let col_path = col_chunk.column_descr().path();
+            let col_name = col_path.parts().last().map(|s| 
s.as_str()).unwrap_or("");
+
+            // Skip if we don't have type info for this column
+            let Some(&data_type) = column_types.get(col_name) else {
+                continue;
+            };
+
+            // Extract statistics if available
+            if let Some(stats) = col_chunk.statistics() {
+                let col_stats =
+                    ColumnStatistics::from_parquet_statistics(col_name, 
data_type, stats);
+                container.columns.insert(col_name.to_string(), col_stats);
+            } else {
+                // No stats available, create empty entry
+                container.columns.insert(
+                    col_name.to_string(),
+                    ColumnStatistics::new(col_name.to_string(), 
data_type.clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Convert to DataFusion Statistics for query planning.
+    pub fn to_datafusion_statistics(&self, schema: &Schema) -> 
datafusion_common::Statistics {
+        use datafusion_common::ColumnStatistics as DFColStats;
+        use datafusion_common::Statistics;
+        use datafusion_common::stats::Precision;
+
+        Statistics {
+            num_rows: self
+                .num_rows
+                .map(|n| Precision::Exact(n as usize))
+                .unwrap_or(Precision::Absent),
+            total_byte_size: Precision::Absent,
+            column_statistics: schema
+                .fields()
+                .iter()
+                .map(|field| {
+                    self.columns
+                        .get(field.name())
+                        .map(|col_stats| DFColStats {
+                            null_count: col_stats
+                                .null_count
+                                .map(|n| Precision::Exact(n as usize))
+                                .unwrap_or(Precision::Absent),
+                            min_value: col_stats
+                                .min_value
+                                .clone()
+                                .map(Precision::Exact)
+                                .unwrap_or(Precision::Absent),
+                            max_value: col_stats
+                                .max_value
+                                .clone()
+                                .map(Precision::Exact)
+                                .unwrap_or(Precision::Absent),
+                            sum_value: Precision::Absent,
+                            distinct_count: col_stats
+                                .distinct_count
+                                .map(|n| Precision::Exact(n as usize))
+                                .unwrap_or(Precision::Absent),
+                        })
+                        .unwrap_or_default()
+                })
+                .collect(),
+        }
+    }
+}
+
+/// Convert Parquet statistics to ScalarValue min/max pair.
+fn parquet_stats_to_scalar(
+    stats: &ParquetStatistics,
+    data_type: &DataType,
+) -> (Option<ScalarValue>, Option<ScalarValue>) {
+    match stats {
+        ParquetStatistics::Boolean(s) => {
+            let min = s.min_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+            let max = s.max_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+            (min, max)
+        }
+        ParquetStatistics::Int32(s) => {
+            // Int32 in Parquet can map to various Arrow types
+            match data_type {
+                DataType::Int32 => {
+                    let min = s.min_opt().map(|v| 
ScalarValue::Int32(Some(*v)));
+                    let max = s.max_opt().map(|v| 
ScalarValue::Int32(Some(*v)));
+                    (min, max)
+                }
+                DataType::Int16 => {
+                    let min = s.min_opt().map(|v| ScalarValue::Int16(Some(*v 
as i16)));
+                    let max = s.max_opt().map(|v| ScalarValue::Int16(Some(*v 
as i16)));
+                    (min, max)
+                }
+                DataType::Int8 => {
+                    let min = s.min_opt().map(|v| ScalarValue::Int8(Some(*v as 
i8)));
+                    let max = s.max_opt().map(|v| ScalarValue::Int8(Some(*v as 
i8)));
+                    (min, max)
+                }
+                DataType::UInt32 => {
+                    let min = s.min_opt().map(|v| ScalarValue::UInt32(Some(*v 
as u32)));
+                    let max = s.max_opt().map(|v| ScalarValue::UInt32(Some(*v 
as u32)));
+                    (min, max)
+                }
+                DataType::UInt16 => {
+                    let min = s.min_opt().map(|v| ScalarValue::UInt16(Some(*v 
as u16)));
+                    let max = s.max_opt().map(|v| ScalarValue::UInt16(Some(*v 
as u16)));
+                    (min, max)
+                }
+                DataType::UInt8 => {
+                    let min = s.min_opt().map(|v| ScalarValue::UInt8(Some(*v 
as u8)));
+                    let max = s.max_opt().map(|v| ScalarValue::UInt8(Some(*v 
as u8)));
+                    (min, max)
+                }
+                DataType::Date32 => {
+                    let min = s.min_opt().map(|v| 
ScalarValue::Date32(Some(*v)));
+                    let max = s.max_opt().map(|v| 
ScalarValue::Date32(Some(*v)));
+                    (min, max)
+                }
+                _ => {
+                    // Default to Int32
+                    let min = s.min_opt().map(|v| 
ScalarValue::Int32(Some(*v)));
+                    let max = s.max_opt().map(|v| 
ScalarValue::Int32(Some(*v)));
+                    (min, max)
+                }
+            }
+        }
+        ParquetStatistics::Int64(s) => match data_type {
+            DataType::Int64 => {
+                let min = s.min_opt().map(|v| ScalarValue::Int64(Some(*v)));
+                let max = s.max_opt().map(|v| ScalarValue::Int64(Some(*v)));
+                (min, max)
+            }
+            DataType::UInt64 => {
+                let min = s.min_opt().map(|v| ScalarValue::UInt64(Some(*v as 
u64)));
+                let max = s.max_opt().map(|v| ScalarValue::UInt64(Some(*v as 
u64)));
+                (min, max)

Review Comment:
   Similar to the Int32 case, converting `i64` to `u64` using `as` cast is 
potentially unsafe. Negative i64 values will be reinterpreted as large unsigned 
values, producing incorrect statistics. Consider adding validation to ensure 
the value is non-negative before converting to unsigned types, and return 
`None` for statistics if the conversion is invalid.
   ```suggestion
                   // Safely convert i64 stats to u64 stats.
                   // If any statistic is negative, drop both bounds to avoid 
incorrect pruning.
                   let min = s.min_opt().and_then(|v| {
                       if *v >= 0 {
                           Some(ScalarValue::UInt64(Some(*v as u64)))
                       } else {
                           None
                       }
                   });
                   let max = s.max_opt().and_then(|v| {
                       if *v >= 0 {
                           Some(ScalarValue::UInt64(Some(*v as u64)))
                       } else {
                           None
                       }
                   });
   
                   // If we cannot safely represent both bounds, return no 
stats.
                   if min.is_some() && max.is_some() {
                       (min, max)
                   } else {
                       (None, None)
                   }
   ```



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    /// - `file_null_count = sum(row_group_null_counts)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files (typically 64-128MB chunks).
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+
+    /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+    ///
+    /// Finest granularity, highest memory.
+    /// Can skip individual pages (typically ~1MB chunks).
+    /// Most effective when data is sorted by filter columns.
+    /// Requires Parquet files to be written with page index enabled.
+    Page,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            "page" => Ok(Self::Page),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group, page"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+            Self::Page => write!(f, "page"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (as ScalarValue for compatibility with DataFusion)
+    pub min_value: Option<ScalarValue>,
+    /// Maximum value (as ScalarValue for compatibility with DataFusion)
+    pub max_value: Option<ScalarValue>,
+    /// Number of null values
+    pub null_count: Option<i64>,
+    /// Number of distinct values (if available)
+    pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+            null_count: None,
+            distinct_count: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow 
data type.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+        let null_count = stats.null_count_opt().map(|n| n as i64);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+            null_count,
+            distinct_count: None,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs, sums null counts.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (&self.min_value, &other.min_value) {
+            (Some(a), Some(b)) => scalar_min(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (&self.max_value, &other.max_value) {
+            (Some(a), Some(b)) => scalar_max(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Sum null counts
+        self.null_count = match (self.null_count, other.null_count) {
+            (Some(a), Some(b)) => Some(a + b),
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(b),
+            (None, None) => None,
+        };
+
+        // Distinct count cannot be accurately merged, so we set it to None
+        self.distinct_count = None;
+    }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Sum up total rows across all row groups
+        let total_rows: i64 = metadata.row_groups().iter().map(|rg| 
rg.num_rows()).sum();
+        container.num_rows = Some(total_rows);
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name.clone())
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            // Get column name from the column descriptor
+            let col_path = col_chunk.column_descr().path();
+            let col_name = col_path.parts().last().map(|s| 
s.as_str()).unwrap_or("");
+
+            // Skip if we don't have type info for this column
+            let Some(&data_type) = column_types.get(col_name) else {
+                continue;
+            };
+
+            // Extract statistics if available
+            if let Some(stats) = col_chunk.statistics() {
+                let col_stats =
+                    ColumnStatistics::from_parquet_statistics(col_name, 
data_type, stats);
+                container.columns.insert(col_name.to_string(), col_stats);
+            } else {
+                // No stats available, create empty entry
+                container.columns.insert(
+                    col_name.to_string(),
+                    ColumnStatistics::new(col_name.to_string(), 
data_type.clone()),

Review Comment:
   The `DataType` is cloned multiple times in this module (lines 112, 133, 227, 
267). Arrow's `DataType` can be a complex nested structure, and cloning it 
repeatedly could be inefficient, especially for tables with many columns. 
Consider using `Arc<DataType>` if the type needs to be stored in multiple 
places, or borrowing where possible. This is particularly important in the hot 
path of statistics aggregation.



##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
             crypto: Default::default(),
         };
         let table_schema = self.schema();
+
+        // Read column stats pruning level configuration
+        let stats_level: String = self
+            .table
+            .hudi_configs
+            .get_or_default(ColumnStatsPruningLevel)
+            .into();
+
         let mut parquet_source = ParquetSource::new(parquet_opts);
+
+        // Enable page index for page-level pruning
+        if stats_level == "page" {
+            parquet_source = parquet_source.with_enable_page_index(true);

Review Comment:
   The configuration is only applied for "page" level, but "row_group" level is 
not handled. According to the `StatsGranularity` enum and the configuration 
documentation, there are three levels: "file", "row_group", and "page". If 
"row_group" requires different handling or if it's handled by default, this 
should be documented in a comment. Otherwise, consider adding handling for all 
three levels explicitly to make the code's behavior clear.
   ```suggestion
           // Configure pruning behavior based on stats granularity:
           // - "file": rely on file-level statistics (default ParquetSource 
behavior).
           // - "row_group": rely on row-group statistics (also default 
behavior).
           // - "page": additionally enable the Parquet page index for 
finer-grained pruning.
           match stats_level.as_str() {
               "page" => {
                   parquet_source = parquet_source.with_enable_page_index(true);
               }
               "file" | "row_group" => {
                   // No extra configuration required beyond the default 
ParquetSource settings.
               }
               _ => {
                   // Unknown value: fall back to default behavior (no page 
index).
               }
   ```



##########
crates/core/src/config/read.rs:
##########
@@ -64,6 +64,21 @@ pub enum HudiReadConfig {
     /// Target number of rows per batch for streaming reads.
     /// This controls the batch size when using streaming APIs.
     StreamBatchSize,
+
+    /// Column statistics pruning granularity level.
+    /// Options: "file" (default), "row_group", "page"
+    /// - file: Aggregate stats from row groups, can skip entire files
+    /// - row_group: Per-row-group stats, can skip 64-128MB chunks
+    /// - page: Per-page stats (Parquet 1.11+), can skip ~1MB chunks
+    ColumnStatsPruningLevel,
+
+    /// Maximum number of Parquet footers to cache.
+    /// Default: 1000
+    FooterCacheSize,
+
+    /// TTL for cached footers in seconds.
+    /// Default: 300 (5 minutes)

Review Comment:
   The configuration options `FooterCacheSize` and `FooterCacheTtlSeconds` are 
defined but never used in the codebase. According to the comment in the 
`statistics()` method, footer caching will be "implemented in Phase 4-5", but 
these configurations are already present. Consider either:
   1. Removing these unused configurations until the feature is actually 
implemented, or
   2. Adding a TODO comment in the configuration file referencing the future 
phase where they will be used.
   
   Having unused configurations can be confusing for users and makes the 
codebase harder to maintain.
   ```suggestion
       /// Default: 1000
       ///
       /// TODO(Phase 4-5): Wire this into the Parquet footer cache 
implementation once added.
       FooterCacheSize,
   
       /// TTL for cached footers in seconds.
       /// Default: 300 (5 minutes)
       ///
       /// TODO(Phase 4-5): Wire this into the Parquet footer cache 
implementation once added.
   ```



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    /// - `file_null_count = sum(row_group_null_counts)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files (typically 64-128MB chunks).
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+
+    /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+    ///
+    /// Finest granularity, highest memory.
+    /// Can skip individual pages (typically ~1MB chunks).
+    /// Most effective when data is sorted by filter columns.
+    /// Requires Parquet files to be written with page index enabled.
+    Page,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            "page" => Ok(Self::Page),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group, page"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+            Self::Page => write!(f, "page"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (as ScalarValue for compatibility with DataFusion)
+    pub min_value: Option<ScalarValue>,
+    /// Maximum value (as ScalarValue for compatibility with DataFusion)
+    pub max_value: Option<ScalarValue>,
+    /// Number of null values
+    pub null_count: Option<i64>,
+    /// Number of distinct values (if available)
+    pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+            null_count: None,
+            distinct_count: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow 
data type.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+        let null_count = stats.null_count_opt().map(|n| n as i64);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+            null_count,
+            distinct_count: None,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs, sums null counts.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (&self.min_value, &other.min_value) {
+            (Some(a), Some(b)) => scalar_min(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (&self.max_value, &other.max_value) {
+            (Some(a), Some(b)) => scalar_max(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Sum null counts
+        self.null_count = match (self.null_count, other.null_count) {
+            (Some(a), Some(b)) => Some(a + b),
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(b),
+            (None, None) => None,
+        };
+
+        // Distinct count cannot be accurately merged, so we set it to None
+        self.distinct_count = None;
+    }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Sum up total rows across all row groups
+        let total_rows: i64 = metadata.row_groups().iter().map(|rg| 
rg.num_rows()).sum();
+        container.num_rows = Some(total_rows);
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name.clone())
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            // Get column name from the column descriptor
+            let col_path = col_chunk.column_descr().path();
+            let col_name = col_path.parts().last().map(|s| 
s.as_str()).unwrap_or("");
+
+            // Skip if we don't have type info for this column
+            let Some(&data_type) = column_types.get(col_name) else {
+                continue;
+            };
+
+            // Extract statistics if available
+            if let Some(stats) = col_chunk.statistics() {
+                let col_stats =
+                    ColumnStatistics::from_parquet_statistics(col_name, 
data_type, stats);
+                container.columns.insert(col_name.to_string(), col_stats);
+            } else {
+                // No stats available, create empty entry
+                container.columns.insert(
+                    col_name.to_string(),
+                    ColumnStatistics::new(col_name.to_string(), 
data_type.clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Convert to DataFusion Statistics for query planning.
+    pub fn to_datafusion_statistics(&self, schema: &Schema) -> 
datafusion_common::Statistics {
+        use datafusion_common::ColumnStatistics as DFColStats;
+        use datafusion_common::Statistics;
+        use datafusion_common::stats::Precision;
+
+        Statistics {
+            num_rows: self
+                .num_rows
+                .map(|n| Precision::Exact(n as usize))
+                .unwrap_or(Precision::Absent),
+            total_byte_size: Precision::Absent,
+            column_statistics: schema
+                .fields()
+                .iter()
+                .map(|field| {
+                    self.columns
+                        .get(field.name())
+                        .map(|col_stats| DFColStats {
+                            null_count: col_stats
+                                .null_count
+                                .map(|n| Precision::Exact(n as usize))
+                                .unwrap_or(Precision::Absent),
+                            min_value: col_stats
+                                .min_value
+                                .clone()
+                                .map(Precision::Exact)
+                                .unwrap_or(Precision::Absent),
+                            max_value: col_stats
+                                .max_value
+                                .clone()
+                                .map(Precision::Exact)
+                                .unwrap_or(Precision::Absent),
+                            sum_value: Precision::Absent,
+                            distinct_count: col_stats
+                                .distinct_count
+                                .map(|n| Precision::Exact(n as usize))
+                                .unwrap_or(Precision::Absent),
+                        })
+                        .unwrap_or_default()
+                })
+                .collect(),
+        }
+    }
+}
+
+/// Convert Parquet statistics to ScalarValue min/max pair.
+fn parquet_stats_to_scalar(
+    stats: &ParquetStatistics,
+    data_type: &DataType,
+) -> (Option<ScalarValue>, Option<ScalarValue>) {
+    match stats {
+        ParquetStatistics::Boolean(s) => {
+            let min = s.min_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+            let max = s.max_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+            (min, max)
+        }
+        ParquetStatistics::Int32(s) => {
+            // Int32 in Parquet can map to various Arrow types
+            match data_type {
+                DataType::Int32 => {
+                    let min = s.min_opt().map(|v| 
ScalarValue::Int32(Some(*v)));
+                    let max = s.max_opt().map(|v| 
ScalarValue::Int32(Some(*v)));
+                    (min, max)
+                }
+                DataType::Int16 => {
+                    let min = s.min_opt().map(|v| ScalarValue::Int16(Some(*v 
as i16)));
+                    let max = s.max_opt().map(|v| ScalarValue::Int16(Some(*v 
as i16)));
+                    (min, max)
+                }
+                DataType::Int8 => {
+                    let min = s.min_opt().map(|v| ScalarValue::Int8(Some(*v as 
i8)));
+                    let max = s.max_opt().map(|v| ScalarValue::Int8(Some(*v as 
i8)));
+                    (min, max)
+                }
+                DataType::UInt32 => {
+                    let min = s.min_opt().map(|v| ScalarValue::UInt32(Some(*v 
as u32)));
+                    let max = s.max_opt().map(|v| ScalarValue::UInt32(Some(*v 
as u32)));
+                    (min, max)
+                }
+                DataType::UInt16 => {
+                    let min = s.min_opt().map(|v| ScalarValue::UInt16(Some(*v 
as u16)));
+                    let max = s.max_opt().map(|v| ScalarValue::UInt16(Some(*v 
as u16)));
+                    (min, max)
+                }
+                DataType::UInt8 => {
+                    let min = s.min_opt().map(|v| ScalarValue::UInt8(Some(*v 
as u8)));
+                    let max = s.max_opt().map(|v| ScalarValue::UInt8(Some(*v 
as u8)));
+                    (min, max)

Review Comment:
   The type conversions from `i32` to smaller integer types (i16, i8, u32, u16, 
u8) use unsafe `as` casts without checking if the values are within the valid 
range for the target type. If a Parquet file has Int32 statistics with values 
outside the range of, say, i8 (-128 to 127), the cast will silently overflow 
and produce incorrect statistics. Consider adding validation or using checked 
conversions (e.g., `try_into()`) and returning `None` for statistics when 
values are out of range, rather than producing incorrect min/max values that 
could lead to incorrect query results.
   ```suggestion
                       let min_i16 = s.min_opt().and_then(|v| 
i16::try_from(*v).ok());
                       let max_i16 = s.max_opt().and_then(|v| 
i16::try_from(*v).ok());
                       match (min_i16, max_i16) {
                           (Some(min), Some(max)) => (
                               Some(ScalarValue::Int16(Some(min))),
                               Some(ScalarValue::Int16(Some(max))),
                           ),
                           _ => (None, None),
                       }
                   }
                   DataType::Int8 => {
                       let min_i8 = s.min_opt().and_then(|v| 
i8::try_from(*v).ok());
                       let max_i8 = s.max_opt().and_then(|v| 
i8::try_from(*v).ok());
                       match (min_i8, max_i8) {
                           (Some(min), Some(max)) => (
                               Some(ScalarValue::Int8(Some(min))),
                               Some(ScalarValue::Int8(Some(max))),
                           ),
                           _ => (None, None),
                       }
                   }
                   DataType::UInt32 => {
                       let min_u32 = s.min_opt().and_then(|v| 
u32::try_from(*v).ok());
                       let max_u32 = s.max_opt().and_then(|v| 
u32::try_from(*v).ok());
                       match (min_u32, max_u32) {
                           (Some(min), Some(max)) => (
                               Some(ScalarValue::UInt32(Some(min))),
                               Some(ScalarValue::UInt32(Some(max))),
                           ),
                           _ => (None, None),
                       }
                   }
                   DataType::UInt16 => {
                       let min_u16 = s.min_opt().and_then(|v| 
u16::try_from(*v).ok());
                       let max_u16 = s.max_opt().and_then(|v| 
u16::try_from(*v).ok());
                       match (min_u16, max_u16) {
                           (Some(min), Some(max)) => (
                               Some(ScalarValue::UInt16(Some(min))),
                               Some(ScalarValue::UInt16(Some(max))),
                           ),
                           _ => (None, None),
                       }
                   }
                   DataType::UInt8 => {
                       let min_u8 = s.min_opt().and_then(|v| 
u8::try_from(*v).ok());
                       let max_u8 = s.max_opt().and_then(|v| 
u8::try_from(*v).ok());
                       match (min_u8, max_u8) {
                           (Some(min), Some(max)) => (
                               Some(ScalarValue::UInt8(Some(min))),
                               Some(ScalarValue::UInt8(Some(max))),
                           ),
                           _ => (None, None),
                       }
   ```



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    /// - `file_null_count = sum(row_group_null_counts)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files (typically 64-128MB chunks).
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+
+    /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+    ///
+    /// Finest granularity, highest memory.
+    /// Can skip individual pages (typically ~1MB chunks).
+    /// Most effective when data is sorted by filter columns.
+    /// Requires Parquet files to be written with page index enabled.
+    Page,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            "page" => Ok(Self::Page),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group, page"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+            Self::Page => write!(f, "page"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (as ScalarValue for compatibility with DataFusion)
+    pub min_value: Option<ScalarValue>,
+    /// Maximum value (as ScalarValue for compatibility with DataFusion)
+    pub max_value: Option<ScalarValue>,
+    /// Number of null values
+    pub null_count: Option<i64>,
+    /// Number of distinct values (if available)
+    pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+            null_count: None,
+            distinct_count: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow 
data type.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+        let null_count = stats.null_count_opt().map(|n| n as i64);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+            null_count,
+            distinct_count: None,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs, sums null counts.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (&self.min_value, &other.min_value) {
+            (Some(a), Some(b)) => scalar_min(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (&self.max_value, &other.max_value) {
+            (Some(a), Some(b)) => scalar_max(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Sum null counts
+        self.null_count = match (self.null_count, other.null_count) {
+            (Some(a), Some(b)) => Some(a + b),
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(b),
+            (None, None) => None,
+        };
+
+        // Distinct count cannot be accurately merged, so we set it to None
+        self.distinct_count = None;
+    }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Sum up total rows across all row groups
+        let total_rows: i64 = metadata.row_groups().iter().map(|rg| 
rg.num_rows()).sum();
+        container.num_rows = Some(total_rows);
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name.clone())
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            // Get column name from the column descriptor
+            let col_path = col_chunk.column_descr().path();
+            let col_name = col_path.parts().last().map(|s| 
s.as_str()).unwrap_or("");

Review Comment:
   The implementation only extracts the last part of the column path, which may 
not correctly handle nested columns in structs or lists. For example, a nested 
column path like "parent.child.grandchild" would only extract "grandchild", 
potentially causing mismatches with the schema's field names if they use fully 
qualified paths. Consider testing with nested column structures or documenting 
this limitation if nested columns are not yet supported.
   ```suggestion
               // Get column name from the column descriptor.
               // Use the first path component (top-level field) to align with 
the Arrow schema.
               let col_path = col_chunk.column_descr().path();
               let col_name = col_path.parts().first().map(|s| 
s.as_str()).unwrap_or("");
   ```



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    /// - `file_null_count = sum(row_group_null_counts)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files (typically 64-128MB chunks).
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+
+    /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+    ///
+    /// Finest granularity, highest memory.
+    /// Can skip individual pages (typically ~1MB chunks).
+    /// Most effective when data is sorted by filter columns.
+    /// Requires Parquet files to be written with page index enabled.
+    Page,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            "page" => Ok(Self::Page),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group, page"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+            Self::Page => write!(f, "page"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (as ScalarValue for compatibility with DataFusion)
+    pub min_value: Option<ScalarValue>,
+    /// Maximum value (as ScalarValue for compatibility with DataFusion)
+    pub max_value: Option<ScalarValue>,
+    /// Number of null values
+    pub null_count: Option<i64>,
+    /// Number of distinct values (if available)
+    pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+            null_count: None,
+            distinct_count: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow 
data type.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+        let null_count = stats.null_count_opt().map(|n| n as i64);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+            null_count,
+            distinct_count: None,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs, sums null counts.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (&self.min_value, &other.min_value) {
+            (Some(a), Some(b)) => scalar_min(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (&self.max_value, &other.max_value) {
+            (Some(a), Some(b)) => scalar_max(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Sum null counts
+        self.null_count = match (self.null_count, other.null_count) {
+            (Some(a), Some(b)) => Some(a + b),
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(b),
+            (None, None) => None,
+        };
+
+        // Distinct count cannot be accurately merged, so we set it to None
+        self.distinct_count = None;
+    }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Sum up total rows across all row groups
+        let total_rows: i64 = metadata.row_groups().iter().map(|rg| 
rg.num_rows()).sum();
+        container.num_rows = Some(total_rows);
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name.clone())
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            // Get column name from the column descriptor
+            let col_path = col_chunk.column_descr().path();
+            let col_name = col_path.parts().last().map(|s| 
s.as_str()).unwrap_or("");

Review Comment:
   The code uses `.unwrap_or("")` to get the column name from the Parquet path, 
which could result in an empty string for columns. An empty string column name 
is then used as a HashMap key. This could cause incorrect statistics 
aggregation if multiple columns somehow end up with empty names. Consider 
logging a warning and skipping columns that don't have valid names, or using a 
more robust error handling approach.
   ```suggestion
               let Some(col_name) = col_path.parts().last().map(|s| s.as_str()) 
else {
                   // Skip columns without a valid name to avoid incorrect 
statistics aggregation
                   continue;
               };
   ```



##########
crates/datafusion/tests/read_tests.rs:
##########
@@ -129,15 +129,40 @@ async fn verify_plan(
     let explaining_rb = explaining_rb.first().unwrap();
     let plan = get_str_column(explaining_rb, "plan").join("");
     let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
+
+    // Verify sort and projection are in the plan
     assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
     assert!(plan_lines[2].starts_with(&format!(
         "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as 
isActive, \
         get_field(structField@3, field2) as {table_name}.structField[field2]]"
     )));
-    assert!(plan_lines[4].starts_with(
-        "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND 
get_field(structField@3, field2) > 30"
-    ));
-    
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
+
+    // With pushdown enabled, some filters are pushed to parquet reader.
+    // The struct field filter remains in FilterExec, simpler predicates go to 
DataSourceExec.
+    assert!(
+        plan.contains("FilterExec"),
+        "Plan should contain FilterExec"
+    );
+    assert!(
+        plan.contains("get_field(structField@3, field2) > 30"),
+        "Plan should filter on structField.field2 > 30"
+    );
+
+    // Verify predicate pushdown is working - some predicates pushed to 
DataSourceExec
+    assert!(
+        plan.contains("predicate="),
+        "DataSourceExec should have predicate pushdown"
+    );
+    assert!(
+        plan.contains("name") && plan.contains("Alice"),
+        "Name filter should be present in plan (either FilterExec or predicate 
pushdown)"
+    );
+
+    // Verify input partitions
+    assert!(
+        
plan.contains(&format!("input_partitions={planned_input_partitioned}")),
+        "Plan should have correct input_partitions"
+    );

Review Comment:
   The test assertions have been changed from checking specific plan line 
positions to using looser `contains` checks. While this makes tests less 
brittle to plan format changes, it also makes them less precise. The test no 
longer verifies the exact structure of the query plan. Consider:
   1. Adding comments explaining why the more flexible assertions are needed
   2. Ensuring that the key aspects of the plan structure are still validated 
(e.g., that FilterExec comes before DataSourceExec in the plan hierarchy)
   3. Adding a new test that specifically validates the predicate pushdown 
behavior with different column stats pruning levels



##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
             crypto: Default::default(),
         };
         let table_schema = self.schema();
+
+        // Read column stats pruning level configuration
+        let stats_level: String = self
+            .table
+            .hudi_configs
+            .get_or_default(ColumnStatsPruningLevel)
+            .into();
+
         let mut parquet_source = ParquetSource::new(parquet_opts);
+
+        // Enable page index for page-level pruning
+        if stats_level == "page" {
+            parquet_source = parquet_source.with_enable_page_index(true);
+        }
+
         let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
         if let Some(expr) = filter {
             let df_schema = DFSchema::try_from(table_schema.clone())?;
             let predicate = create_physical_expr(&expr, &df_schema, 
state.execution_props())?;
-            parquet_source = parquet_source.with_predicate(predicate)
+            parquet_source = parquet_source
+                .with_predicate(predicate)
+                .with_pushdown_filters(true)
+                .with_reorder_filters(true);

Review Comment:
   The methods `.with_pushdown_filters(true)` and `.with_reorder_filters(true)` 
are now always enabled when there are filters, but this behavior change is not 
tested. The test assertions were modified to be more lenient (using `contains` 
instead of exact line matching), but there's no test that specifically verifies 
that these new methods improve pruning effectiveness or that they work 
correctly. Consider adding a test that demonstrates the effectiveness of these 
settings, perhaps by checking that fewer files or row groups are scanned.



##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
             crypto: Default::default(),
         };
         let table_schema = self.schema();
+
+        // Read column stats pruning level configuration
+        let stats_level: String = self
+            .table
+            .hudi_configs
+            .get_or_default(ColumnStatsPruningLevel)
+            .into();

Review Comment:
   The configuration value is read as a String but never validated against the 
valid values ("file", "row_group", "page"). If a user provides an invalid value 
like "invalid_level", it will be silently used without error, potentially 
causing unexpected behavior. Consider validating the configuration value using 
`StatsGranularity::from_str()` when reading it, and returning an error if the 
value is invalid. This would provide immediate feedback to users about 
configuration errors.



##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different 
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning 
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+    /// File-level stats (aggregated from all row groups).
+    ///
+    /// Coarsest granularity, lowest memory overhead.
+    /// Can skip entire files that don't match predicates.
+    /// Stats are computed by aggregating row group stats:
+    /// - `file_min = min(row_group_mins)`
+    /// - `file_max = max(row_group_maxs)`
+    /// - `file_null_count = sum(row_group_null_counts)`
+    #[default]
+    File,
+
+    /// Row group level stats (directly from Parquet footer).
+    ///
+    /// Balanced granularity, moderate memory.
+    /// Can skip row groups within files (typically 64-128MB chunks).
+    /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+    RowGroup,
+
+    /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+    ///
+    /// Finest granularity, highest memory.
+    /// Can skip individual pages (typically ~1MB chunks).
+    /// Most effective when data is sorted by filter columns.
+    /// Requires Parquet files to be written with page index enabled.
+    Page,
+}
+
+impl FromStr for StatsGranularity {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "file" => Ok(Self::File),
+            "row_group" | "rowgroup" => Ok(Self::RowGroup),
+            "page" => Ok(Self::Page),
+            _ => Err(format!(
+                "Invalid stats granularity: '{s}'. Valid options: file, 
row_group, page"
+            )),
+        }
+    }
+}
+
+impl std::fmt::Display for StatsGranularity {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File => write!(f, "file"),
+            Self::RowGroup => write!(f, "row_group"),
+            Self::Page => write!(f, "page"),
+        }
+    }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+    /// Column name
+    pub column_name: String,
+    /// Arrow data type
+    pub data_type: DataType,
+    /// Minimum value (as ScalarValue for compatibility with DataFusion)
+    pub min_value: Option<ScalarValue>,
+    /// Maximum value (as ScalarValue for compatibility with DataFusion)
+    pub max_value: Option<ScalarValue>,
+    /// Number of null values
+    pub null_count: Option<i64>,
+    /// Number of distinct values (if available)
+    pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+    /// Create a new ColumnStatistics with the given column name and data type.
+    pub fn new(column_name: String, data_type: DataType) -> Self {
+        Self {
+            column_name,
+            data_type,
+            min_value: None,
+            max_value: None,
+            null_count: None,
+            distinct_count: None,
+        }
+    }
+
+    /// Create from Parquet row group statistics.
+    ///
+    /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow 
data type.
+    pub fn from_parquet_statistics(
+        column_name: &str,
+        data_type: &DataType,
+        stats: &ParquetStatistics,
+    ) -> Self {
+        let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+        let null_count = stats.null_count_opt().map(|n| n as i64);
+
+        Self {
+            column_name: column_name.to_string(),
+            data_type: data_type.clone(),
+            min_value,
+            max_value,
+            null_count,
+            distinct_count: None,
+        }
+    }
+
+    /// Merge with another ColumnStatistics (for aggregation).
+    ///
+    /// Takes min of mins, max of maxs, sums null counts.
+    /// Used when aggregating row group stats to file-level stats.
+    pub fn merge(&mut self, other: &ColumnStatistics) {
+        // Merge min values (take the smaller one)
+        self.min_value = match (&self.min_value, &other.min_value) {
+            (Some(a), Some(b)) => scalar_min(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Merge max values (take the larger one)
+        self.max_value = match (&self.max_value, &other.max_value) {
+            (Some(a), Some(b)) => scalar_max(a, b),
+            (Some(a), None) => Some(a.clone()),
+            (None, Some(b)) => Some(b.clone()),
+            (None, None) => None,
+        };
+
+        // Sum null counts
+        self.null_count = match (self.null_count, other.null_count) {
+            (Some(a), Some(b)) => Some(a + b),
+            (Some(a), None) => Some(a),
+            (None, Some(b)) => Some(b),
+            (None, None) => None,
+        };
+
+        // Distinct count cannot be accurately merged, so we set it to None
+        self.distinct_count = None;
+    }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+    /// Granularity of these statistics
+    pub granularity: StatsGranularity,
+    /// Number of rows covered by these statistics
+    pub num_rows: Option<i64>,
+    /// Column statistics by column name
+    pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+    /// Create an empty statistics container.
+    pub fn new(granularity: StatsGranularity) -> Self {
+        Self {
+            granularity,
+            num_rows: None,
+            columns: HashMap::new(),
+        }
+    }
+
+    /// Create file-level stats by aggregating row group stats from Parquet 
metadata.
+    ///
+    /// This iterates through all row groups, extracts stats for each column,
+    /// and aggregates them to file-level statistics.
+    pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema) 
-> Self {
+        let mut container = Self::new(StatsGranularity::File);
+
+        // Sum up total rows across all row groups
+        let total_rows: i64 = metadata.row_groups().iter().map(|rg| 
rg.num_rows()).sum();
+        container.num_rows = Some(total_rows);
+
+        // Iterate through row groups and aggregate stats
+        for row_group in metadata.row_groups() {
+            let rg_stats = Self::from_row_group(row_group, schema);
+
+            // Merge row group stats into file-level stats
+            for (col_name, col_stats) in rg_stats.columns {
+                container
+                    .columns
+                    .entry(col_name.clone())
+                    .and_modify(|existing| existing.merge(&col_stats))
+                    .or_insert(col_stats);
+            }
+        }
+
+        // Ensure all schema columns have an entry (even if no stats)
+        for field in schema.fields() {
+            let col_name = field.name();
+            if !container.columns.contains_key(col_name) {
+                container.columns.insert(
+                    col_name.clone(),
+                    ColumnStatistics::new(col_name.clone(), 
field.data_type().clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Create row-group-level stats from a single row group.
+    pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) -> 
Self {
+        let mut container = Self::new(StatsGranularity::RowGroup);
+        container.num_rows = Some(row_group.num_rows());
+
+        // Build a map of column name to Arrow data type
+        let column_types: HashMap<&str, &DataType> = schema
+            .fields()
+            .iter()
+            .map(|f| (f.name().as_str(), f.data_type()))
+            .collect();
+
+        // Extract stats for each column in the row group
+        for col_chunk in row_group.columns() {
+            // Get column name from the column descriptor
+            let col_path = col_chunk.column_descr().path();
+            let col_name = col_path.parts().last().map(|s| 
s.as_str()).unwrap_or("");
+
+            // Skip if we don't have type info for this column
+            let Some(&data_type) = column_types.get(col_name) else {
+                continue;
+            };
+
+            // Extract statistics if available
+            if let Some(stats) = col_chunk.statistics() {
+                let col_stats =
+                    ColumnStatistics::from_parquet_statistics(col_name, 
data_type, stats);
+                container.columns.insert(col_name.to_string(), col_stats);
+            } else {
+                // No stats available, create empty entry
+                container.columns.insert(
+                    col_name.to_string(),
+                    ColumnStatistics::new(col_name.to_string(), 
data_type.clone()),
+                );
+            }
+        }
+
+        container
+    }
+
+    /// Convert to DataFusion Statistics for query planning.
+    pub fn to_datafusion_statistics(&self, schema: &Schema) -> 
datafusion_common::Statistics {
+        use datafusion_common::ColumnStatistics as DFColStats;
+        use datafusion_common::Statistics;
+        use datafusion_common::stats::Precision;
+
+        Statistics {
+            num_rows: self
+                .num_rows
+                .map(|n| Precision::Exact(n as usize))
+                .unwrap_or(Precision::Absent),
+            total_byte_size: Precision::Absent,
+            column_statistics: schema
+                .fields()
+                .iter()
+                .map(|field| {
+                    self.columns
+                        .get(field.name())
+                        .map(|col_stats| DFColStats {
+                            null_count: col_stats
+                                .null_count
+                                .map(|n| Precision::Exact(n as usize))
+                                .unwrap_or(Precision::Absent),
+                            min_value: col_stats
+                                .min_value
+                                .clone()
+                                .map(Precision::Exact)
+                                .unwrap_or(Precision::Absent),
+                            max_value: col_stats
+                                .max_value
+                                .clone()
+                                .map(Precision::Exact)
+                                .unwrap_or(Precision::Absent),
+                            sum_value: Precision::Absent,
+                            distinct_count: col_stats
+                                .distinct_count
+                                .map(|n| Precision::Exact(n as usize))
+                                .unwrap_or(Precision::Absent),
+                        })
+                        .unwrap_or_default()
+                })
+                .collect(),
+        }
+    }
+}

Review Comment:
   The statistics module infrastructure (`StatisticsContainer`, 
`ColumnStatistics`, etc.) is defined but not actually used anywhere in the 
codebase. The `statistics()` method in `HudiDataSource` returns `None` with a 
comment indicating it will be implemented later. Additionally, the 
configuration `ColumnStatsPruningLevel` only affects the 
`with_enable_page_index` setting on the ParquetSource, but doesn't use the 
abstraction defined in the statistics module. Consider adding integration tests 
that demonstrate the statistics module working end-to-end, or documenting 
clearly that this is preparatory infrastructure for future work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to