Copilot commented on code in PR #516:
URL: https://github.com/apache/hudi-rs/pull/516#discussion_r2678989481
##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
crypto: Default::default(),
};
let table_schema = self.schema();
+
+ // Read column stats pruning level configuration
+ let stats_level: String = self
+ .table
+ .hudi_configs
+ .get_or_default(ColumnStatsPruningLevel)
+ .into();
+
let mut parquet_source = ParquetSource::new(parquet_opts);
+
+ // Enable page index for page-level pruning
+ if stats_level == "page" {
Review Comment:
The string comparison `stats_level == "page"` is fragile and could fail
silently if the configuration value is set to "Page" or "PAGE". Since
`StatsGranularity::from_str` already handles case-insensitive parsing, consider
parsing the configuration value to `StatsGranularity` and comparing the enum
values instead of raw strings. This would make the code more robust and
maintainable.
```suggestion
// Enable page index for page-level pruning (case-insensitive)
if stats_level.eq_ignore_ascii_case("page") {
```
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ /// - `file_null_count = sum(row_group_null_counts)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files (typically 64-128MB chunks).
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+
+ /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+ ///
+ /// Finest granularity, highest memory.
+ /// Can skip individual pages (typically ~1MB chunks).
+ /// Most effective when data is sorted by filter columns.
+ /// Requires Parquet files to be written with page index enabled.
+ Page,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ "page" => Ok(Self::Page),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group, page"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ Self::Page => write!(f, "page"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (as ScalarValue for compatibility with DataFusion)
+ pub min_value: Option<ScalarValue>,
+ /// Maximum value (as ScalarValue for compatibility with DataFusion)
+ pub max_value: Option<ScalarValue>,
+ /// Number of null values
+ pub null_count: Option<i64>,
+ /// Number of distinct values (if available)
+ pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ null_count: None,
+ distinct_count: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow
data type.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+ let null_count = stats.null_count_opt().map(|n| n as i64);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ null_count,
+ distinct_count: None,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs, sums null counts.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (&self.min_value, &other.min_value) {
+ (Some(a), Some(b)) => scalar_min(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (&self.max_value, &other.max_value) {
+ (Some(a), Some(b)) => scalar_max(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Sum null counts
+ self.null_count = match (self.null_count, other.null_count) {
+ (Some(a), Some(b)) => Some(a + b),
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(b),
+ (None, None) => None,
+ };
+
+ // Distinct count cannot be accurately merged, so we set it to None
+ self.distinct_count = None;
+ }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Sum up total rows across all row groups
+ let total_rows: i64 = metadata.row_groups().iter().map(|rg|
rg.num_rows()).sum();
+ container.num_rows = Some(total_rows);
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name.clone())
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ // Get column name from the column descriptor
+ let col_path = col_chunk.column_descr().path();
+ let col_name = col_path.parts().last().map(|s|
s.as_str()).unwrap_or("");
+
+ // Skip if we don't have type info for this column
+ let Some(&data_type) = column_types.get(col_name) else {
+ continue;
+ };
+
+ // Extract statistics if available
+ if let Some(stats) = col_chunk.statistics() {
+ let col_stats =
+ ColumnStatistics::from_parquet_statistics(col_name,
data_type, stats);
+ container.columns.insert(col_name.to_string(), col_stats);
+ } else {
+ // No stats available, create empty entry
+ container.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics::new(col_name.to_string(),
data_type.clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Convert to DataFusion Statistics for query planning.
Review Comment:
The `to_datafusion_statistics` method is missing documentation. As a public
API method that performs an important conversion, it should have a doc comment
explaining what it does, its parameters, return value, and any important notes
about the conversion (e.g., that it maintains the same order as the input
schema). This is especially important since the method is likely to be a key
integration point with DataFusion.
```suggestion
/// Convert this container into DataFusion [`Statistics`] for query
planning.
///
/// # Arguments
///
/// * `schema` - The Arrow [`Schema`] describing the logical columns of
the table.
/// The fields in this schema determine both which columns are looked
up in
/// this statistics container and the order of the returned per-column
/// statistics.
///
/// # Returns
///
/// A [`datafusion_common::Statistics`] value where:
///
/// * `num_rows` is populated from `self.num_rows` as an
/// [`Precision::Exact`] value when available, or [`Precision::Absent`]
/// when the row count is not known.
/// * `total_byte_size` is currently always [`Precision::Absent`].
/// * `column_statistics` contains one entry per field in `schema`, in
the
/// same order as `schema.fields()`. Each entry is populated from the
/// corresponding [`ColumnStatistics`] in this container if present, or
/// left as the default (all [`Precision::Absent`]) if no statistics
/// are available for that column.
///
/// # Notes
///
/// * Column lookup is performed by field name; if a field in `schema`
/// has no matching entry in `self.columns`, the default
/// [`datafusion_common::ColumnStatistics`] is used.
/// * Minimum and maximum values are converted to [`ScalarValue`] and
/// wrapped in [`Precision::Exact`] when available, otherwise they are
/// set to [`Precision::Absent`].
```
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ /// - `file_null_count = sum(row_group_null_counts)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files (typically 64-128MB chunks).
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+
+ /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+ ///
+ /// Finest granularity, highest memory.
+ /// Can skip individual pages (typically ~1MB chunks).
+ /// Most effective when data is sorted by filter columns.
+ /// Requires Parquet files to be written with page index enabled.
+ Page,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ "page" => Ok(Self::Page),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group, page"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ Self::Page => write!(f, "page"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (as ScalarValue for compatibility with DataFusion)
+ pub min_value: Option<ScalarValue>,
+ /// Maximum value (as ScalarValue for compatibility with DataFusion)
+ pub max_value: Option<ScalarValue>,
+ /// Number of null values
+ pub null_count: Option<i64>,
+ /// Number of distinct values (if available)
+ pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ null_count: None,
+ distinct_count: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow
data type.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+ let null_count = stats.null_count_opt().map(|n| n as i64);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ null_count,
+ distinct_count: None,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs, sums null counts.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (&self.min_value, &other.min_value) {
+ (Some(a), Some(b)) => scalar_min(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (&self.max_value, &other.max_value) {
+ (Some(a), Some(b)) => scalar_max(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Sum null counts
+ self.null_count = match (self.null_count, other.null_count) {
+ (Some(a), Some(b)) => Some(a + b),
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(b),
+ (None, None) => None,
+ };
+
+ // Distinct count cannot be accurately merged, so we set it to None
+ self.distinct_count = None;
+ }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Sum up total rows across all row groups
+ let total_rows: i64 = metadata.row_groups().iter().map(|rg|
rg.num_rows()).sum();
+ container.num_rows = Some(total_rows);
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name.clone())
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ // Get column name from the column descriptor
+ let col_path = col_chunk.column_descr().path();
+ let col_name = col_path.parts().last().map(|s|
s.as_str()).unwrap_or("");
+
+ // Skip if we don't have type info for this column
+ let Some(&data_type) = column_types.get(col_name) else {
+ continue;
+ };
+
+ // Extract statistics if available
+ if let Some(stats) = col_chunk.statistics() {
+ let col_stats =
+ ColumnStatistics::from_parquet_statistics(col_name,
data_type, stats);
+ container.columns.insert(col_name.to_string(), col_stats);
+ } else {
+ // No stats available, create empty entry
+ container.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics::new(col_name.to_string(),
data_type.clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Convert to DataFusion Statistics for query planning.
+ pub fn to_datafusion_statistics(&self, schema: &Schema) ->
datafusion_common::Statistics {
+ use datafusion_common::ColumnStatistics as DFColStats;
+ use datafusion_common::Statistics;
+ use datafusion_common::stats::Precision;
+
+ Statistics {
+ num_rows: self
+ .num_rows
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ total_byte_size: Precision::Absent,
+ column_statistics: schema
+ .fields()
+ .iter()
+ .map(|field| {
+ self.columns
+ .get(field.name())
+ .map(|col_stats| DFColStats {
+ null_count: col_stats
+ .null_count
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ min_value: col_stats
+ .min_value
+ .clone()
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ max_value: col_stats
+ .max_value
+ .clone()
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ sum_value: Precision::Absent,
+ distinct_count: col_stats
+ .distinct_count
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ })
+ .unwrap_or_default()
+ })
+ .collect(),
+ }
+ }
+}
+
+/// Convert Parquet statistics to ScalarValue min/max pair.
+fn parquet_stats_to_scalar(
+ stats: &ParquetStatistics,
+ data_type: &DataType,
+) -> (Option<ScalarValue>, Option<ScalarValue>) {
+ match stats {
+ ParquetStatistics::Boolean(s) => {
+ let min = s.min_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+ let max = s.max_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+ (min, max)
+ }
+ ParquetStatistics::Int32(s) => {
+ // Int32 in Parquet can map to various Arrow types
+ match data_type {
+ DataType::Int32 => {
+ let min = s.min_opt().map(|v|
ScalarValue::Int32(Some(*v)));
+ let max = s.max_opt().map(|v|
ScalarValue::Int32(Some(*v)));
+ (min, max)
+ }
+ DataType::Int16 => {
+ let min = s.min_opt().map(|v| ScalarValue::Int16(Some(*v
as i16)));
+ let max = s.max_opt().map(|v| ScalarValue::Int16(Some(*v
as i16)));
+ (min, max)
+ }
+ DataType::Int8 => {
+ let min = s.min_opt().map(|v| ScalarValue::Int8(Some(*v as
i8)));
+ let max = s.max_opt().map(|v| ScalarValue::Int8(Some(*v as
i8)));
+ (min, max)
+ }
+ DataType::UInt32 => {
+ let min = s.min_opt().map(|v| ScalarValue::UInt32(Some(*v
as u32)));
+ let max = s.max_opt().map(|v| ScalarValue::UInt32(Some(*v
as u32)));
+ (min, max)
+ }
+ DataType::UInt16 => {
+ let min = s.min_opt().map(|v| ScalarValue::UInt16(Some(*v
as u16)));
+ let max = s.max_opt().map(|v| ScalarValue::UInt16(Some(*v
as u16)));
+ (min, max)
+ }
+ DataType::UInt8 => {
+ let min = s.min_opt().map(|v| ScalarValue::UInt8(Some(*v
as u8)));
+ let max = s.max_opt().map(|v| ScalarValue::UInt8(Some(*v
as u8)));
+ (min, max)
+ }
+ DataType::Date32 => {
+ let min = s.min_opt().map(|v|
ScalarValue::Date32(Some(*v)));
+ let max = s.max_opt().map(|v|
ScalarValue::Date32(Some(*v)));
+ (min, max)
+ }
+ _ => {
+ // Default to Int32
+ let min = s.min_opt().map(|v|
ScalarValue::Int32(Some(*v)));
+ let max = s.max_opt().map(|v|
ScalarValue::Int32(Some(*v)));
+ (min, max)
+ }
+ }
+ }
+ ParquetStatistics::Int64(s) => match data_type {
+ DataType::Int64 => {
+ let min = s.min_opt().map(|v| ScalarValue::Int64(Some(*v)));
+ let max = s.max_opt().map(|v| ScalarValue::Int64(Some(*v)));
+ (min, max)
+ }
+ DataType::UInt64 => {
+ let min = s.min_opt().map(|v| ScalarValue::UInt64(Some(*v as
u64)));
+ let max = s.max_opt().map(|v| ScalarValue::UInt64(Some(*v as
u64)));
+ (min, max)
Review Comment:
Similar to the Int32 case, converting `i64` to `u64` using `as` cast is
potentially unsafe. Negative i64 values will be reinterpreted as large unsigned
values, producing incorrect statistics. Consider adding validation to ensure
the value is non-negative before converting to unsigned types, and return
`None` for statistics if the conversion is invalid.
```suggestion
// Safely convert i64 stats to u64 stats.
// If any statistic is negative, drop both bounds to avoid
incorrect pruning.
let min = s.min_opt().and_then(|v| {
if *v >= 0 {
Some(ScalarValue::UInt64(Some(*v as u64)))
} else {
None
}
});
let max = s.max_opt().and_then(|v| {
if *v >= 0 {
Some(ScalarValue::UInt64(Some(*v as u64)))
} else {
None
}
});
// If we cannot safely represent both bounds, return no
stats.
if min.is_some() && max.is_some() {
(min, max)
} else {
(None, None)
}
```
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ /// - `file_null_count = sum(row_group_null_counts)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files (typically 64-128MB chunks).
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+
+ /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+ ///
+ /// Finest granularity, highest memory.
+ /// Can skip individual pages (typically ~1MB chunks).
+ /// Most effective when data is sorted by filter columns.
+ /// Requires Parquet files to be written with page index enabled.
+ Page,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ "page" => Ok(Self::Page),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group, page"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ Self::Page => write!(f, "page"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (as ScalarValue for compatibility with DataFusion)
+ pub min_value: Option<ScalarValue>,
+ /// Maximum value (as ScalarValue for compatibility with DataFusion)
+ pub max_value: Option<ScalarValue>,
+ /// Number of null values
+ pub null_count: Option<i64>,
+ /// Number of distinct values (if available)
+ pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ null_count: None,
+ distinct_count: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow
data type.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+ let null_count = stats.null_count_opt().map(|n| n as i64);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ null_count,
+ distinct_count: None,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs, sums null counts.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (&self.min_value, &other.min_value) {
+ (Some(a), Some(b)) => scalar_min(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (&self.max_value, &other.max_value) {
+ (Some(a), Some(b)) => scalar_max(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Sum null counts
+ self.null_count = match (self.null_count, other.null_count) {
+ (Some(a), Some(b)) => Some(a + b),
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(b),
+ (None, None) => None,
+ };
+
+ // Distinct count cannot be accurately merged, so we set it to None
+ self.distinct_count = None;
+ }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Sum up total rows across all row groups
+ let total_rows: i64 = metadata.row_groups().iter().map(|rg|
rg.num_rows()).sum();
+ container.num_rows = Some(total_rows);
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name.clone())
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ // Get column name from the column descriptor
+ let col_path = col_chunk.column_descr().path();
+ let col_name = col_path.parts().last().map(|s|
s.as_str()).unwrap_or("");
+
+ // Skip if we don't have type info for this column
+ let Some(&data_type) = column_types.get(col_name) else {
+ continue;
+ };
+
+ // Extract statistics if available
+ if let Some(stats) = col_chunk.statistics() {
+ let col_stats =
+ ColumnStatistics::from_parquet_statistics(col_name,
data_type, stats);
+ container.columns.insert(col_name.to_string(), col_stats);
+ } else {
+ // No stats available, create empty entry
+ container.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics::new(col_name.to_string(),
data_type.clone()),
Review Comment:
The `DataType` is cloned multiple times in this module (lines 112, 133, 227,
267). Arrow's `DataType` can be a complex nested structure, and cloning it
repeatedly could be inefficient, especially for tables with many columns.
Consider using `Arc<DataType>` if the type needs to be stored in multiple
places, or borrowing where possible. This is particularly important in the hot
path of statistics aggregation.
##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
crypto: Default::default(),
};
let table_schema = self.schema();
+
+ // Read column stats pruning level configuration
+ let stats_level: String = self
+ .table
+ .hudi_configs
+ .get_or_default(ColumnStatsPruningLevel)
+ .into();
+
let mut parquet_source = ParquetSource::new(parquet_opts);
+
+ // Enable page index for page-level pruning
+ if stats_level == "page" {
+ parquet_source = parquet_source.with_enable_page_index(true);
Review Comment:
The configuration is only applied for "page" level, but "row_group" level is
not handled. According to the `StatsGranularity` enum and the configuration
documentation, there are three levels: "file", "row_group", and "page". If
"row_group" requires different handling or if it's handled by default, this
should be documented in a comment. Otherwise, consider adding handling for all
three levels explicitly to make the code's behavior clear.
```suggestion
// Configure pruning behavior based on stats granularity:
// - "file": rely on file-level statistics (default ParquetSource
behavior).
// - "row_group": rely on row-group statistics (also default
behavior).
// - "page": additionally enable the Parquet page index for
finer-grained pruning.
match stats_level.as_str() {
"page" => {
parquet_source = parquet_source.with_enable_page_index(true);
}
"file" | "row_group" => {
// No extra configuration required beyond the default
ParquetSource settings.
}
_ => {
// Unknown value: fall back to default behavior (no page
index).
}
```
##########
crates/core/src/config/read.rs:
##########
@@ -64,6 +64,21 @@ pub enum HudiReadConfig {
/// Target number of rows per batch for streaming reads.
/// This controls the batch size when using streaming APIs.
StreamBatchSize,
+
+ /// Column statistics pruning granularity level.
+ /// Options: "file" (default), "row_group", "page"
+ /// - file: Aggregate stats from row groups, can skip entire files
+ /// - row_group: Per-row-group stats, can skip 64-128MB chunks
+ /// - page: Per-page stats (Parquet 1.11+), can skip ~1MB chunks
+ ColumnStatsPruningLevel,
+
+ /// Maximum number of Parquet footers to cache.
+ /// Default: 1000
+ FooterCacheSize,
+
+ /// TTL for cached footers in seconds.
+ /// Default: 300 (5 minutes)
Review Comment:
The configuration options `FooterCacheSize` and `FooterCacheTtlSeconds` are
defined but never used in the codebase. According to the comment in the
`statistics()` method, footer caching will be "implemented in Phase 4-5", but
these configurations are already present. Consider either:
1. Removing these unused configurations until the feature is actually
implemented, or
2. Adding a TODO comment in the configuration file referencing the future
phase where they will be used.
Having unused configurations can be confusing for users and makes the
codebase harder to maintain.
```suggestion
/// Default: 1000
///
/// TODO(Phase 4-5): Wire this into the Parquet footer cache
implementation once added.
FooterCacheSize,
/// TTL for cached footers in seconds.
/// Default: 300 (5 minutes)
///
/// TODO(Phase 4-5): Wire this into the Parquet footer cache
implementation once added.
```
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ /// - `file_null_count = sum(row_group_null_counts)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files (typically 64-128MB chunks).
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+
+ /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+ ///
+ /// Finest granularity, highest memory.
+ /// Can skip individual pages (typically ~1MB chunks).
+ /// Most effective when data is sorted by filter columns.
+ /// Requires Parquet files to be written with page index enabled.
+ Page,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ "page" => Ok(Self::Page),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group, page"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ Self::Page => write!(f, "page"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (as ScalarValue for compatibility with DataFusion)
+ pub min_value: Option<ScalarValue>,
+ /// Maximum value (as ScalarValue for compatibility with DataFusion)
+ pub max_value: Option<ScalarValue>,
+ /// Number of null values
+ pub null_count: Option<i64>,
+ /// Number of distinct values (if available)
+ pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ null_count: None,
+ distinct_count: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow
data type.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+ let null_count = stats.null_count_opt().map(|n| n as i64);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ null_count,
+ distinct_count: None,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs, sums null counts.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (&self.min_value, &other.min_value) {
+ (Some(a), Some(b)) => scalar_min(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (&self.max_value, &other.max_value) {
+ (Some(a), Some(b)) => scalar_max(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Sum null counts
+ self.null_count = match (self.null_count, other.null_count) {
+ (Some(a), Some(b)) => Some(a + b),
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(b),
+ (None, None) => None,
+ };
+
+ // Distinct count cannot be accurately merged, so we set it to None
+ self.distinct_count = None;
+ }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Sum up total rows across all row groups
+ let total_rows: i64 = metadata.row_groups().iter().map(|rg|
rg.num_rows()).sum();
+ container.num_rows = Some(total_rows);
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name.clone())
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ // Get column name from the column descriptor
+ let col_path = col_chunk.column_descr().path();
+ let col_name = col_path.parts().last().map(|s|
s.as_str()).unwrap_or("");
+
+ // Skip if we don't have type info for this column
+ let Some(&data_type) = column_types.get(col_name) else {
+ continue;
+ };
+
+ // Extract statistics if available
+ if let Some(stats) = col_chunk.statistics() {
+ let col_stats =
+ ColumnStatistics::from_parquet_statistics(col_name,
data_type, stats);
+ container.columns.insert(col_name.to_string(), col_stats);
+ } else {
+ // No stats available, create empty entry
+ container.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics::new(col_name.to_string(),
data_type.clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Convert to DataFusion Statistics for query planning.
+ pub fn to_datafusion_statistics(&self, schema: &Schema) ->
datafusion_common::Statistics {
+ use datafusion_common::ColumnStatistics as DFColStats;
+ use datafusion_common::Statistics;
+ use datafusion_common::stats::Precision;
+
+ Statistics {
+ num_rows: self
+ .num_rows
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ total_byte_size: Precision::Absent,
+ column_statistics: schema
+ .fields()
+ .iter()
+ .map(|field| {
+ self.columns
+ .get(field.name())
+ .map(|col_stats| DFColStats {
+ null_count: col_stats
+ .null_count
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ min_value: col_stats
+ .min_value
+ .clone()
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ max_value: col_stats
+ .max_value
+ .clone()
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ sum_value: Precision::Absent,
+ distinct_count: col_stats
+ .distinct_count
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ })
+ .unwrap_or_default()
+ })
+ .collect(),
+ }
+ }
+}
+
+/// Convert Parquet statistics to ScalarValue min/max pair.
+fn parquet_stats_to_scalar(
+ stats: &ParquetStatistics,
+ data_type: &DataType,
+) -> (Option<ScalarValue>, Option<ScalarValue>) {
+ match stats {
+ ParquetStatistics::Boolean(s) => {
+ let min = s.min_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+ let max = s.max_opt().map(|v| ScalarValue::Boolean(Some(*v)));
+ (min, max)
+ }
+ ParquetStatistics::Int32(s) => {
+ // Int32 in Parquet can map to various Arrow types
+ match data_type {
+ DataType::Int32 => {
+ let min = s.min_opt().map(|v|
ScalarValue::Int32(Some(*v)));
+ let max = s.max_opt().map(|v|
ScalarValue::Int32(Some(*v)));
+ (min, max)
+ }
+ DataType::Int16 => {
+ let min = s.min_opt().map(|v| ScalarValue::Int16(Some(*v
as i16)));
+ let max = s.max_opt().map(|v| ScalarValue::Int16(Some(*v
as i16)));
+ (min, max)
+ }
+ DataType::Int8 => {
+ let min = s.min_opt().map(|v| ScalarValue::Int8(Some(*v as
i8)));
+ let max = s.max_opt().map(|v| ScalarValue::Int8(Some(*v as
i8)));
+ (min, max)
+ }
+ DataType::UInt32 => {
+ let min = s.min_opt().map(|v| ScalarValue::UInt32(Some(*v
as u32)));
+ let max = s.max_opt().map(|v| ScalarValue::UInt32(Some(*v
as u32)));
+ (min, max)
+ }
+ DataType::UInt16 => {
+ let min = s.min_opt().map(|v| ScalarValue::UInt16(Some(*v
as u16)));
+ let max = s.max_opt().map(|v| ScalarValue::UInt16(Some(*v
as u16)));
+ (min, max)
+ }
+ DataType::UInt8 => {
+ let min = s.min_opt().map(|v| ScalarValue::UInt8(Some(*v
as u8)));
+ let max = s.max_opt().map(|v| ScalarValue::UInt8(Some(*v
as u8)));
+ (min, max)
Review Comment:
The type conversions from `i32` to smaller integer types (i16, i8, u32, u16,
u8) use unsafe `as` casts without checking if the values are within the valid
range for the target type. If a Parquet file has Int32 statistics with values
outside the range of, say, i8 (-128 to 127), the cast will silently overflow
and produce incorrect statistics. Consider adding validation or using checked
conversions (e.g., `try_into()`) and returning `None` for statistics when
values are out of range, rather than producing incorrect min/max values that
could lead to incorrect query results.
```suggestion
let min_i16 = s.min_opt().and_then(|v|
i16::try_from(*v).ok());
let max_i16 = s.max_opt().and_then(|v|
i16::try_from(*v).ok());
match (min_i16, max_i16) {
(Some(min), Some(max)) => (
Some(ScalarValue::Int16(Some(min))),
Some(ScalarValue::Int16(Some(max))),
),
_ => (None, None),
}
}
DataType::Int8 => {
let min_i8 = s.min_opt().and_then(|v|
i8::try_from(*v).ok());
let max_i8 = s.max_opt().and_then(|v|
i8::try_from(*v).ok());
match (min_i8, max_i8) {
(Some(min), Some(max)) => (
Some(ScalarValue::Int8(Some(min))),
Some(ScalarValue::Int8(Some(max))),
),
_ => (None, None),
}
}
DataType::UInt32 => {
let min_u32 = s.min_opt().and_then(|v|
u32::try_from(*v).ok());
let max_u32 = s.max_opt().and_then(|v|
u32::try_from(*v).ok());
match (min_u32, max_u32) {
(Some(min), Some(max)) => (
Some(ScalarValue::UInt32(Some(min))),
Some(ScalarValue::UInt32(Some(max))),
),
_ => (None, None),
}
}
DataType::UInt16 => {
let min_u16 = s.min_opt().and_then(|v|
u16::try_from(*v).ok());
let max_u16 = s.max_opt().and_then(|v|
u16::try_from(*v).ok());
match (min_u16, max_u16) {
(Some(min), Some(max)) => (
Some(ScalarValue::UInt16(Some(min))),
Some(ScalarValue::UInt16(Some(max))),
),
_ => (None, None),
}
}
DataType::UInt8 => {
let min_u8 = s.min_opt().and_then(|v|
u8::try_from(*v).ok());
let max_u8 = s.max_opt().and_then(|v|
u8::try_from(*v).ok());
match (min_u8, max_u8) {
(Some(min), Some(max)) => (
Some(ScalarValue::UInt8(Some(min))),
Some(ScalarValue::UInt8(Some(max))),
),
_ => (None, None),
}
```
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ /// - `file_null_count = sum(row_group_null_counts)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files (typically 64-128MB chunks).
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+
+ /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+ ///
+ /// Finest granularity, highest memory.
+ /// Can skip individual pages (typically ~1MB chunks).
+ /// Most effective when data is sorted by filter columns.
+ /// Requires Parquet files to be written with page index enabled.
+ Page,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ "page" => Ok(Self::Page),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group, page"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ Self::Page => write!(f, "page"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (as ScalarValue for compatibility with DataFusion)
+ pub min_value: Option<ScalarValue>,
+ /// Maximum value (as ScalarValue for compatibility with DataFusion)
+ pub max_value: Option<ScalarValue>,
+ /// Number of null values
+ pub null_count: Option<i64>,
+ /// Number of distinct values (if available)
+ pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ null_count: None,
+ distinct_count: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow
data type.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+ let null_count = stats.null_count_opt().map(|n| n as i64);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ null_count,
+ distinct_count: None,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs, sums null counts.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (&self.min_value, &other.min_value) {
+ (Some(a), Some(b)) => scalar_min(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (&self.max_value, &other.max_value) {
+ (Some(a), Some(b)) => scalar_max(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Sum null counts
+ self.null_count = match (self.null_count, other.null_count) {
+ (Some(a), Some(b)) => Some(a + b),
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(b),
+ (None, None) => None,
+ };
+
+ // Distinct count cannot be accurately merged, so we set it to None
+ self.distinct_count = None;
+ }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Sum up total rows across all row groups
+ let total_rows: i64 = metadata.row_groups().iter().map(|rg|
rg.num_rows()).sum();
+ container.num_rows = Some(total_rows);
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name.clone())
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ // Get column name from the column descriptor
+ let col_path = col_chunk.column_descr().path();
+ let col_name = col_path.parts().last().map(|s|
s.as_str()).unwrap_or("");
Review Comment:
The implementation only extracts the last part of the column path, which may
not correctly handle nested columns in structs or lists. For example, a nested
column path like "parent.child.grandchild" would only extract "grandchild",
potentially causing mismatches with the schema's field names if they use fully
qualified paths. Consider testing with nested column structures or documenting
this limitation if nested columns are not yet supported.
```suggestion
// Get column name from the column descriptor.
// Use the first path component (top-level field) to align with
the Arrow schema.
let col_path = col_chunk.column_descr().path();
let col_name = col_path.parts().first().map(|s|
s.as_str()).unwrap_or("");
```
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ /// - `file_null_count = sum(row_group_null_counts)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files (typically 64-128MB chunks).
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+
+ /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+ ///
+ /// Finest granularity, highest memory.
+ /// Can skip individual pages (typically ~1MB chunks).
+ /// Most effective when data is sorted by filter columns.
+ /// Requires Parquet files to be written with page index enabled.
+ Page,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ "page" => Ok(Self::Page),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group, page"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ Self::Page => write!(f, "page"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (as ScalarValue for compatibility with DataFusion)
+ pub min_value: Option<ScalarValue>,
+ /// Maximum value (as ScalarValue for compatibility with DataFusion)
+ pub max_value: Option<ScalarValue>,
+ /// Number of null values
+ pub null_count: Option<i64>,
+ /// Number of distinct values (if available)
+ pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ null_count: None,
+ distinct_count: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow
data type.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+ let null_count = stats.null_count_opt().map(|n| n as i64);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ null_count,
+ distinct_count: None,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs, sums null counts.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (&self.min_value, &other.min_value) {
+ (Some(a), Some(b)) => scalar_min(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (&self.max_value, &other.max_value) {
+ (Some(a), Some(b)) => scalar_max(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Sum null counts
+ self.null_count = match (self.null_count, other.null_count) {
+ (Some(a), Some(b)) => Some(a + b),
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(b),
+ (None, None) => None,
+ };
+
+ // Distinct count cannot be accurately merged, so we set it to None
+ self.distinct_count = None;
+ }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Sum up total rows across all row groups
+ let total_rows: i64 = metadata.row_groups().iter().map(|rg|
rg.num_rows()).sum();
+ container.num_rows = Some(total_rows);
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name.clone())
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ // Get column name from the column descriptor
+ let col_path = col_chunk.column_descr().path();
+ let col_name = col_path.parts().last().map(|s|
s.as_str()).unwrap_or("");
Review Comment:
The code uses `.unwrap_or("")` to get the column name from the Parquet path,
which could result in an empty string for columns. An empty string column name
is then used as a HashMap key. This could cause incorrect statistics
aggregation if multiple columns somehow end up with empty names. Consider
logging a warning and skipping columns that don't have valid names, or using a
more robust error handling approach.
```suggestion
let Some(col_name) = col_path.parts().last().map(|s| s.as_str())
else {
// Skip columns without a valid name to avoid incorrect
statistics aggregation
continue;
};
```
##########
crates/datafusion/tests/read_tests.rs:
##########
@@ -129,15 +129,40 @@ async fn verify_plan(
let explaining_rb = explaining_rb.first().unwrap();
let plan = get_str_column(explaining_rb, "plan").join("");
let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
+
+ // Verify sort and projection are in the plan
assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
assert!(plan_lines[2].starts_with(&format!(
"ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as
isActive, \
get_field(structField@3, field2) as {table_name}.structField[field2]]"
)));
- assert!(plan_lines[4].starts_with(
- "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND
get_field(structField@3, field2) > 30"
- ));
-
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
+
+ // With pushdown enabled, some filters are pushed to parquet reader.
+ // The struct field filter remains in FilterExec, simpler predicates go to
DataSourceExec.
+ assert!(
+ plan.contains("FilterExec"),
+ "Plan should contain FilterExec"
+ );
+ assert!(
+ plan.contains("get_field(structField@3, field2) > 30"),
+ "Plan should filter on structField.field2 > 30"
+ );
+
+ // Verify predicate pushdown is working - some predicates pushed to
DataSourceExec
+ assert!(
+ plan.contains("predicate="),
+ "DataSourceExec should have predicate pushdown"
+ );
+ assert!(
+ plan.contains("name") && plan.contains("Alice"),
+ "Name filter should be present in plan (either FilterExec or predicate
pushdown)"
+ );
+
+ // Verify input partitions
+ assert!(
+
plan.contains(&format!("input_partitions={planned_input_partitioned}")),
+ "Plan should have correct input_partitions"
+ );
Review Comment:
The test assertions have been changed from checking specific plan line
positions to using looser `contains` checks. While this makes tests less
brittle to plan format changes, it also makes them less precise. The test no
longer verifies the exact structure of the query plan. Consider:
1. Adding comments explaining why the more flexible assertions are needed
2. Ensuring that the key aspects of the plan structure are still validated
(e.g., that FilterExec comes before DataSourceExec in the plan hierarchy)
3. Adding a new test that specifically validates the predicate pushdown
behavior with different column stats pruning levels
##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
crypto: Default::default(),
};
let table_schema = self.schema();
+
+ // Read column stats pruning level configuration
+ let stats_level: String = self
+ .table
+ .hudi_configs
+ .get_or_default(ColumnStatsPruningLevel)
+ .into();
+
let mut parquet_source = ParquetSource::new(parquet_opts);
+
+ // Enable page index for page-level pruning
+ if stats_level == "page" {
+ parquet_source = parquet_source.with_enable_page_index(true);
+ }
+
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(expr) = filter {
let df_schema = DFSchema::try_from(table_schema.clone())?;
let predicate = create_physical_expr(&expr, &df_schema,
state.execution_props())?;
- parquet_source = parquet_source.with_predicate(predicate)
+ parquet_source = parquet_source
+ .with_predicate(predicate)
+ .with_pushdown_filters(true)
+ .with_reorder_filters(true);
Review Comment:
The methods `.with_pushdown_filters(true)` and `.with_reorder_filters(true)`
are now always enabled when there are filters, but this behavior change is not
tested. The test assertions were modified to be more lenient (using `contains`
instead of exact line matching), but there's no test that specifically verifies
that these new methods improve pruning effectiveness or that they work
correctly. Consider adding a test that demonstrates the effectiveness of these
settings, perhaps by checking that fewer files or row groups are scanned.
##########
crates/datafusion/src/lib.rs:
##########
@@ -324,12 +336,29 @@ impl TableProvider for HudiDataSource {
crypto: Default::default(),
};
let table_schema = self.schema();
+
+ // Read column stats pruning level configuration
+ let stats_level: String = self
+ .table
+ .hudi_configs
+ .get_or_default(ColumnStatsPruningLevel)
+ .into();
Review Comment:
The configuration value is read as a String but never validated against the
valid values ("file", "row_group", "page"). If a user provides an invalid value
like "invalid_level", it will be silently used without error, potentially
causing unexpected behavior. Consider validating the configuration value using
`StatsGranularity::from_str()` when reading it, and returning an error if the
value is invalid. This would provide immediate feedback to users about
configuration errors.
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_schema::{DataType, Schema};
+use datafusion_common::ScalarValue;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ /// - `file_null_count = sum(row_group_null_counts)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files (typically 64-128MB chunks).
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+
+ /// Page level stats (from ColumnIndex, requires Parquet 1.11+).
+ ///
+ /// Finest granularity, highest memory.
+ /// Can skip individual pages (typically ~1MB chunks).
+ /// Most effective when data is sorted by filter columns.
+ /// Requires Parquet files to be written with page index enabled.
+ Page,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ "page" => Ok(Self::Page),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group, page"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ Self::Page => write!(f, "page"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (as ScalarValue for compatibility with DataFusion)
+ pub min_value: Option<ScalarValue>,
+ /// Maximum value (as ScalarValue for compatibility with DataFusion)
+ pub max_value: Option<ScalarValue>,
+ /// Number of null values
+ pub null_count: Option<i64>,
+ /// Number of distinct values (if available)
+ pub distinct_count: Option<u64>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ null_count: None,
+ distinct_count: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Converts Parquet's `Statistics` to `ScalarValue` based on the Arrow
data type.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_scalar(stats, data_type);
+ let null_count = stats.null_count_opt().map(|n| n as i64);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ null_count,
+ distinct_count: None,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs, sums null counts.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (&self.min_value, &other.min_value) {
+ (Some(a), Some(b)) => scalar_min(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (&self.max_value, &other.max_value) {
+ (Some(a), Some(b)) => scalar_max(a, b),
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (None, None) => None,
+ };
+
+ // Sum null counts
+ self.null_count = match (self.null_count, other.null_count) {
+ (Some(a), Some(b)) => Some(a + b),
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(b),
+ (None, None) => None,
+ };
+
+ // Distinct count cannot be accurately merged, so we set it to None
+ self.distinct_count = None;
+ }
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Sum up total rows across all row groups
+ let total_rows: i64 = metadata.row_groups().iter().map(|rg|
rg.num_rows()).sum();
+ container.num_rows = Some(total_rows);
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name.clone())
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ // Get column name from the column descriptor
+ let col_path = col_chunk.column_descr().path();
+ let col_name = col_path.parts().last().map(|s|
s.as_str()).unwrap_or("");
+
+ // Skip if we don't have type info for this column
+ let Some(&data_type) = column_types.get(col_name) else {
+ continue;
+ };
+
+ // Extract statistics if available
+ if let Some(stats) = col_chunk.statistics() {
+ let col_stats =
+ ColumnStatistics::from_parquet_statistics(col_name,
data_type, stats);
+ container.columns.insert(col_name.to_string(), col_stats);
+ } else {
+ // No stats available, create empty entry
+ container.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics::new(col_name.to_string(),
data_type.clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Convert to DataFusion Statistics for query planning.
+ pub fn to_datafusion_statistics(&self, schema: &Schema) ->
datafusion_common::Statistics {
+ use datafusion_common::ColumnStatistics as DFColStats;
+ use datafusion_common::Statistics;
+ use datafusion_common::stats::Precision;
+
+ Statistics {
+ num_rows: self
+ .num_rows
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ total_byte_size: Precision::Absent,
+ column_statistics: schema
+ .fields()
+ .iter()
+ .map(|field| {
+ self.columns
+ .get(field.name())
+ .map(|col_stats| DFColStats {
+ null_count: col_stats
+ .null_count
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ min_value: col_stats
+ .min_value
+ .clone()
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ max_value: col_stats
+ .max_value
+ .clone()
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ sum_value: Precision::Absent,
+ distinct_count: col_stats
+ .distinct_count
+ .map(|n| Precision::Exact(n as usize))
+ .unwrap_or(Precision::Absent),
+ })
+ .unwrap_or_default()
+ })
+ .collect(),
+ }
+ }
+}
Review Comment:
The statistics module infrastructure (`StatisticsContainer`,
`ColumnStatistics`, etc.) is defined but not actually used anywhere in the
codebase. The `statistics()` method in `HudiDataSource` returns `None` with a
comment indicating it will be implemented later. Additionally, the
configuration `ColumnStatsPruningLevel` only affects the
`with_enable_page_index` setting on the ParquetSource, but doesn't use the
abstraction defined in the statistics module. Consider adding integration tests
that demonstrate the statistics module working end-to-end, or documenting
clearly that this is preparatory infrastructure for future work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]