This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0cc771c feat: support column stats pruning (#516)
0cc771c is described below
commit 0cc771c05e1f410d0c21af14544669001bd8de3c
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jan 12 02:54:57 2026 -0600
feat: support column stats pruning (#516)
* feat: support column stats pruning
* add logs
---
Cargo.toml | 2 +
crates/core/Cargo.toml | 1 +
crates/core/src/lib.rs | 1 +
crates/core/src/metadata/table/mod.rs | 15 +-
crates/core/src/statistics/mod.rs | 544 +++++++++++++++++++
crates/core/src/storage/mod.rs | 20 +-
crates/core/src/table/file_pruner.rs | 587 +++++++++++++++++++++
crates/core/src/table/fs_view.rs | 255 ++++++---
crates/core/src/table/mod.rs | 16 +-
crates/core/tests/statistics_tests.rs | 958 ++++++++++++++++++++++++++++++++++
crates/test/Cargo.toml | 4 +-
11 files changed, 2338 insertions(+), 65 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index c6cf37b..af9b8ba 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,3 +94,5 @@ flate2 = { version = "1" }
# testing
serial_test = { version = "3" }
+tempfile = { version = "3" }
+zip-extract = { version = "0.3" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index b5414fe..ee9a041 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -89,6 +89,7 @@ percent-encoding = { workspace = true }
[dev-dependencies]
hudi-test = { path = "../test" }
serial_test = { workspace = true }
+tempfile = { workspace = true }
[lints.clippy]
result_large_err = "allow"
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 5b1d891..4001b5c 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -53,6 +53,7 @@ pub mod merge;
pub mod metadata;
mod record;
pub mod schema;
+pub mod statistics;
pub mod storage;
pub mod table;
pub mod timeline;
diff --git a/crates/core/src/metadata/table/mod.rs
b/crates/core/src/metadata/table/mod.rs
index 9d3afd8..e5e211c 100644
--- a/crates/core/src/metadata/table/mod.rs
+++ b/crates/core/src/metadata/table/mod.rs
@@ -26,6 +26,8 @@ pub mod records;
use std::collections::HashMap;
+use arrow_schema::Schema;
+
use crate::Result;
use crate::config::read::HudiReadConfig;
use crate::config::table::HudiTableConfig::{
@@ -36,6 +38,7 @@ use crate::expr::filter::from_str_tuples;
use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
use crate::storage::util::join_url_segments;
use crate::table::Table;
+use crate::table::file_pruner::FilePruner;
use crate::table::partition::PartitionPruner;
use records::FilesPartitionRecord;
@@ -250,9 +253,19 @@ impl Table {
let partition_pruner =
PartitionPruner::new(&filters, &partition_schema,
self.hudi_configs.as_ref())?;
+ // Use empty file pruner for metadata table - no column stats pruning
needed
+ // Use empty schema since the pruner is empty and won't use the schema
+ let file_pruner = FilePruner::empty();
+ let table_schema = Schema::empty();
+
let file_slices = self
.file_system_view
- .get_file_slices_by_storage_listing(&partition_pruner,
&timeline_view)
+ .get_file_slices_by_storage_listing(
+ &partition_pruner,
+ &file_pruner,
+ &table_schema,
+ &timeline_view,
+ )
.await?;
if file_slices.len() != 1 {
diff --git a/crates/core/src/statistics/mod.rs
b/crates/core/src/statistics/mod.rs
new file mode 100644
index 0000000..de88dea
--- /dev/null
+++ b/crates/core/src/statistics/mod.rs
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+//!
+//! Core types:
+//! - [`ColumnStatistics`]: Per-column statistics (min, max) for range-based
pruning
+//! - [`StatisticsContainer`]: Container for all column statistics at a given
granularity
+//!
+//! Min/max values are stored as single-element Arrow arrays (`ArrayRef`),
enabling
+//! direct comparison using `arrow_ord::cmp` functions without custom enum
types.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow_array::{
+ ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array,
Float64Array, Int8Array,
+ Int16Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt8Array,
+ UInt16Array, UInt32Array, UInt64Array,
+};
+use arrow_ord::cmp;
+use arrow_schema::{DataType, Schema, TimeUnit};
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+///
+/// String parsing is case-insensitive and accepts "row_group" or "rowgroup"
for RowGroup.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files.
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+///
+/// Tracks min and max values from Parquet footer statistics for range-based
pruning.
+/// Values are stored as single-element Arrow arrays for direct comparison
using `arrow_ord::cmp`.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (stored as a single-element Arrow array)
+ pub min_value: Option<ArrayRef>,
+ /// Maximum value (stored as a single-element Arrow array)
+ pub max_value: Option<ArrayRef>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Extracts min/max as single-element Arrow arrays using the Arrow
data_type
+ /// for correct logical type representation.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_min_max_arrays(stats,
data_type);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (self.min_value.take(), &other.min_value) {
+ (Some(a), Some(b)) => {
+ if is_less_than(&a, b) {
+ Some(a)
+ } else {
+ Some(Arc::clone(b))
+ }
+ }
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(Arc::clone(b)),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (self.max_value.take(), &other.max_value) {
+ (Some(a), Some(b)) => {
+ if is_greater_than(&a, b) {
+ Some(a)
+ } else {
+ Some(Arc::clone(b))
+ }
+ }
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(Arc::clone(b)),
+ (None, None) => None,
+ };
+ }
+}
+
+/// Returns true if `a < b` using arrow-ord comparison.
+fn is_less_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+ cmp::lt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Returns true if `a > b` using arrow-ord comparison.
+fn is_greater_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+ cmp::gt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Get total rows directly from file metadata
+ container.num_rows = Some(metadata.file_metadata().num_rows());
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name)
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ let col_path = col_chunk.column_descr().path();
+
+ // Skip nested columns (multi-part paths like "struct.field")
+ if col_path.parts().len() > 1 {
+ continue;
+ }
+
+ let Some(col_name) = col_path.parts().first().map(|s| s.as_str())
else {
+ continue;
+ };
+
+ // Skip if we don't have type info for this column
+ let Some(&data_type) = column_types.get(col_name) else {
+ continue;
+ };
+
+ // Extract statistics if available
+ if let Some(stats) = col_chunk.statistics() {
+ let col_stats =
+ ColumnStatistics::from_parquet_statistics(col_name,
data_type, stats);
+ container.columns.insert(col_name.to_string(), col_stats);
+ } else {
+ // No stats available, create empty entry
+ container.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics::new(col_name.to_string(),
data_type.clone()),
+ );
+ }
+ }
+
+ container
+ }
+}
+
+/// Convert Parquet statistics to Arrow single-element arrays.
+///
+/// Uses the Arrow DataType to create appropriately typed arrays from Parquet
+/// physical type statistics.
+fn parquet_stats_to_min_max_arrays(
+ stats: &ParquetStatistics,
+ data_type: &DataType,
+) -> (Option<ArrayRef>, Option<ArrayRef>) {
+ match stats {
+ ParquetStatistics::Boolean(s) => {
+ let min = s
+ .min_opt()
+ .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+ let max = s
+ .max_opt()
+ .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+ (min, max)
+ }
+ ParquetStatistics::Int32(s) => {
+ // Create arrays based on the Arrow logical type
+ let min = s.min_opt().map(|v| int32_to_array(*v, data_type));
+ let max = s.max_opt().map(|v| int32_to_array(*v, data_type));
+ (min, max)
+ }
+ ParquetStatistics::Int64(s) => {
+ let min = s.min_opt().map(|v| int64_to_array(*v, data_type));
+ let max = s.max_opt().map(|v| int64_to_array(*v, data_type));
+ (min, max)
+ }
+ ParquetStatistics::Int96(_) => {
+ // Int96 is deprecated, typically used for timestamps in legacy
Parquet
+ log::debug!("Int96 statistics not supported - legacy Parquet
timestamp format");
+ (None, None)
+ }
+ ParquetStatistics::Float(s) => {
+ let min = s
+ .min_opt()
+ .map(|v| Arc::new(Float32Array::from(vec![*v])) as ArrayRef);
+ let max = s
+ .max_opt()
+ .map(|v| Arc::new(Float32Array::from(vec![*v])) as ArrayRef);
+ (min, max)
+ }
+ ParquetStatistics::Double(s) => {
+ let min = s
+ .min_opt()
+ .map(|v| Arc::new(Float64Array::from(vec![*v])) as ArrayRef);
+ let max = s
+ .max_opt()
+ .map(|v| Arc::new(Float64Array::from(vec![*v])) as ArrayRef);
+ (min, max)
+ }
+ ParquetStatistics::ByteArray(s) => {
+ let min = s.min_opt().map(|b| bytes_to_array(b.data(), data_type));
+ let max = s.max_opt().map(|b| bytes_to_array(b.data(), data_type));
+ (min, max)
+ }
+ ParquetStatistics::FixedLenByteArray(s) => {
+ let min = s.min_opt().map(|b| bytes_to_array(b.data(), data_type));
+ let max = s.max_opt().map(|b| bytes_to_array(b.data(), data_type));
+ (min, max)
+ }
+ }
+}
+
+/// Convert Parquet Int32 physical value to Arrow array based on logical type.
+fn int32_to_array(value: i32, data_type: &DataType) -> ArrayRef {
+ match data_type {
+ DataType::Int8 => Arc::new(Int8Array::from(vec![value as i8])) as
ArrayRef,
+ DataType::Int16 => Arc::new(Int16Array::from(vec![value as i16])) as
ArrayRef,
+ DataType::Int32 => Arc::new(Int32Array::from(vec![value])) as ArrayRef,
+ DataType::UInt8 => Arc::new(UInt8Array::from(vec![value as u8])) as
ArrayRef,
+ DataType::UInt16 => Arc::new(UInt16Array::from(vec![value as u16])) as
ArrayRef,
+ DataType::UInt32 => Arc::new(UInt32Array::from(vec![value as u32])) as
ArrayRef,
+ DataType::Date32 => Arc::new(Date32Array::from(vec![value])) as
ArrayRef,
+ _ => Arc::new(Int32Array::from(vec![value])) as ArrayRef, // fallback
+ }
+}
+
+/// Convert Parquet Int64 physical value to Arrow array based on logical type.
+fn int64_to_array(value: i64, data_type: &DataType) -> ArrayRef {
+ match data_type {
+ DataType::Int64 => Arc::new(Int64Array::from(vec![value])) as ArrayRef,
+ DataType::UInt64 => Arc::new(UInt64Array::from(vec![value as u64])) as
ArrayRef,
+ DataType::Timestamp(time_unit, tz) => {
+ match time_unit {
+ TimeUnit::Second => {
+
Arc::new(TimestampSecondArray::from(vec![value]).with_timezone_opt(tz.clone()))
+ as ArrayRef
+ }
+ TimeUnit::Millisecond => Arc::new(
+
TimestampMillisecondArray::from(vec![value]).with_timezone_opt(tz.clone()),
+ ) as ArrayRef,
+ TimeUnit::Microsecond => Arc::new(
+
TimestampMicrosecondArray::from(vec![value]).with_timezone_opt(tz.clone()),
+ ) as ArrayRef,
+ TimeUnit::Nanosecond => Arc::new(
+
TimestampNanosecondArray::from(vec![value]).with_timezone_opt(tz.clone()),
+ ) as ArrayRef,
+ }
+ }
+ _ => Arc::new(Int64Array::from(vec![value])) as ArrayRef, // fallback
+ }
+}
+
+/// Convert Parquet byte array to Arrow array based on logical type.
+fn bytes_to_array(data: &[u8], data_type: &DataType) -> ArrayRef {
+ match data_type {
+ DataType::Utf8 | DataType::LargeUtf8 => {
+ let s = String::from_utf8_lossy(data).into_owned();
+ Arc::new(StringArray::from(vec![s])) as ArrayRef
+ }
+ _ => Arc::new(BinaryArray::from_vec(vec![data])) as ArrayRef,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_stats_granularity_from_str() {
+ assert_eq!(
+ StatsGranularity::from_str("file").unwrap(),
+ StatsGranularity::File
+ );
+ assert_eq!(
+ StatsGranularity::from_str("FILE").unwrap(),
+ StatsGranularity::File
+ );
+ assert_eq!(
+ StatsGranularity::from_str("row_group").unwrap(),
+ StatsGranularity::RowGroup
+ );
+ assert_eq!(
+ StatsGranularity::from_str("rowgroup").unwrap(),
+ StatsGranularity::RowGroup
+ );
+ assert!(StatsGranularity::from_str("invalid").is_err());
+ }
+
+ #[test]
+ fn test_stats_granularity_display() {
+ assert_eq!(format!("{}", StatsGranularity::File), "file");
+ assert_eq!(format!("{}", StatsGranularity::RowGroup), "row_group");
+ }
+
+ #[test]
+ fn test_stats_granularity_default() {
+ assert_eq!(StatsGranularity::default(), StatsGranularity::File);
+ }
+
+ /// Helper to create a single-element Int32 array.
+ fn int32_array(value: i32) -> ArrayRef {
+ Arc::new(Int32Array::from(vec![value])) as ArrayRef
+ }
+
+ /// Helper to create a single-element UInt32 array.
+ fn uint32_array(value: u32) -> ArrayRef {
+ Arc::new(UInt32Array::from(vec![value])) as ArrayRef
+ }
+
+ /// Helper to check array value equality for Int32.
+ fn get_int32(arr: &ArrayRef) -> i32 {
+ arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+ }
+
+ /// Helper to check array value equality for UInt32.
+ fn get_uint32(arr: &ArrayRef) -> u32 {
+ arr.as_any().downcast_ref::<UInt32Array>().unwrap().value(0)
+ }
+
+ #[test]
+ fn test_column_statistics_merge() {
+ // Test 1: Both stats have values - takes min of mins, max of maxs
+ let mut stats1 = ColumnStatistics {
+ column_name: "test".to_string(),
+ data_type: DataType::Int32,
+ min_value: Some(int32_array(10)),
+ max_value: Some(int32_array(50)),
+ };
+ let stats2 = ColumnStatistics {
+ column_name: "test".to_string(),
+ data_type: DataType::Int32,
+ min_value: Some(int32_array(5)),
+ max_value: Some(int32_array(100)),
+ };
+ stats1.merge(&stats2);
+ assert_eq!(get_int32(stats1.min_value.as_ref().unwrap()), 5);
+ assert_eq!(get_int32(stats1.max_value.as_ref().unwrap()), 100);
+
+ // Test 2: One side has None values - preserves the Some value
+ let mut stats3 = ColumnStatistics {
+ column_name: "test".to_string(),
+ data_type: DataType::Int32,
+ min_value: Some(int32_array(10)),
+ max_value: None,
+ };
+ let stats4 = ColumnStatistics {
+ column_name: "test".to_string(),
+ data_type: DataType::Int32,
+ min_value: None,
+ max_value: Some(int32_array(100)),
+ };
+ stats3.merge(&stats4);
+ assert_eq!(get_int32(stats3.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(stats3.max_value.as_ref().unwrap()), 100);
+ }
+
+ #[test]
+ fn test_unsigned_integer_merge() {
+ // Test that UInt32 values are correctly compared using unsigned
semantics
+ let mut uint32_stats = ColumnStatistics {
+ column_name: "test".to_string(),
+ data_type: DataType::UInt32,
+ min_value: Some(uint32_array(100)),
+ max_value: Some(uint32_array(200)),
+ };
+ let uint32_large = ColumnStatistics {
+ column_name: "test".to_string(),
+ data_type: DataType::UInt32,
+ min_value: Some(uint32_array(3_000_000_000)),
+ max_value: Some(uint32_array(4_000_000_000)),
+ };
+ uint32_stats.merge(&uint32_large);
+ // 100 < 3B in unsigned comparison
+ assert_eq!(get_uint32(uint32_stats.min_value.as_ref().unwrap()), 100);
+ // 4B > 200 in unsigned comparison
+ assert_eq!(
+ get_uint32(uint32_stats.max_value.as_ref().unwrap()),
+ 4_000_000_000
+ );
+ }
+
+ #[test]
+ fn test_arrow_ord_comparisons() {
+ // Test arrow-ord comparisons directly
+ let a = int32_array(10);
+ let b = int32_array(20);
+
+ assert!(is_less_than(&a, &b));
+ assert!(!is_less_than(&b, &a));
+ assert!(is_greater_than(&b, &a));
+ assert!(!is_greater_than(&a, &b));
+
+ // Test unsigned comparisons
+ let small = uint32_array(100);
+ let large = uint32_array(3_000_000_000);
+ assert!(is_less_than(&small, &large));
+ assert!(is_greater_than(&large, &small));
+ }
+}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index e9650d9..e0c0491 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -38,6 +38,7 @@ use url::Url;
use crate::config::HudiConfigs;
use crate::config::table::HudiTableConfig;
+use crate::statistics::StatisticsContainer;
use crate::storage::error::StorageError::{Creation, InvalidPath};
use crate::storage::error::{Result, StorageError};
use crate::storage::file_metadata::FileMetadata;
@@ -230,7 +231,7 @@ impl Storage {
pub async fn get_parquet_file_schema(
&self,
relative_path: &str,
- ) -> Result<arrow::datatypes::Schema> {
+ ) -> Result<arrow_schema::Schema> {
let parquet_meta =
self.get_parquet_file_metadata(relative_path).await?;
Ok(parquet_to_arrow_schema(
parquet_meta.file_metadata().schema_descr(),
@@ -238,6 +239,23 @@ impl Storage {
)?)
}
+ /// Get column statistics for a Parquet file.
+ ///
+ /// # Arguments
+ /// * `relative_path` - Relative path to the Parquet file
+ /// * `schema` - Arrow schema to use for extracting statistics
+ pub async fn get_parquet_column_stats(
+ &self,
+ relative_path: &str,
+ schema: &arrow_schema::Schema,
+ ) -> Result<StatisticsContainer> {
+ let parquet_meta =
self.get_parquet_file_metadata(relative_path).await?;
+ Ok(StatisticsContainer::from_parquet_metadata(
+ &parquet_meta,
+ schema,
+ ))
+ }
+
pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/file_pruner.rs
b/crates/core/src/table/file_pruner.rs
new file mode 100644
index 0000000..500a2a3
--- /dev/null
+++ b/crates/core/src/table/file_pruner.rs
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! File-level pruner for filtering files based on column statistics.
+
+use crate::Result;
+use crate::expr::ExprOperator;
+use crate::expr::filter::{Filter, SchemableFilter};
+use crate::statistics::{ColumnStatistics, StatisticsContainer};
+
+use arrow_array::{ArrayRef, Datum};
+use arrow_ord::cmp;
+use arrow_schema::Schema;
+use std::collections::HashSet;
+
+/// A file-level pruner that filters files based on column statistics.
+///
+/// This pruner uses min/max statistics from Parquet files to determine if a
file
+/// can be skipped (pruned) based on query predicates. A file is pruned if its
+/// statistics prove that no rows in the file can match the predicate.
+#[derive(Debug, Clone)]
+pub struct FilePruner {
+ /// Filters that apply to non-partition columns
+ and_filters: Vec<SchemableFilter>,
+}
+
+impl FilePruner {
+ /// Creates a new file pruner with filters on non-partition columns.
+ ///
+ /// Filters on partition columns are excluded since they are handled by
PartitionPruner.
+ ///
+ /// # Arguments
+ /// * `and_filters` - List of filters to apply
+ /// * `table_schema` - The table's data schema
+ /// * `partition_schema` - The partition schema (filters on these columns
are excluded)
+ pub fn new(
+ and_filters: &[Filter],
+ table_schema: &Schema,
+ partition_schema: &Schema,
+ ) -> Result<Self> {
+ // Get partition column names to exclude
+ let partition_columns: HashSet<&str> = partition_schema
+ .fields()
+ .iter()
+ .map(|f| f.name().as_str())
+ .collect();
+
+ // Only keep filters on non-partition columns that exist in the table
schema
+ let and_filters: Vec<SchemableFilter> = and_filters
+ .iter()
+ .filter(|filter|
!partition_columns.contains(filter.field_name.as_str()))
+ .filter_map(|filter| SchemableFilter::try_from((filter.clone(),
table_schema)).ok())
+ .collect();
+
+ Ok(FilePruner { and_filters })
+ }
+
+ /// Creates an empty file pruner that does not filter any files.
+ pub fn empty() -> Self {
+ FilePruner {
+ and_filters: Vec::new(),
+ }
+ }
+
+ /// Returns `true` if the pruner does not have any filters.
+ pub fn is_empty(&self) -> bool {
+ self.and_filters.is_empty()
+ }
+
+ /// Returns `true` if the file should be included based on its statistics.
+ ///
+ /// A file is included if ANY of its rows MIGHT match all the filters.
+ /// A file is excluded (pruned) only if we can prove that NO rows can
match.
+ ///
+ /// If statistics are missing or incomplete, the file is included (safe
default).
+ pub fn should_include(&self, stats: &StatisticsContainer) -> bool {
+ // If no filters, include everything
+ if self.and_filters.is_empty() {
+ return true;
+ }
+
+ // All filters must pass (AND semantics)
+ // If any filter definitively excludes the file, return false
+ for filter in &self.and_filters {
+ let col_name = filter.field.name();
+
+ // Get column statistics. When using
StatisticsContainer::from_parquet_metadata(),
+ // all schema columns will have an entry. However, stats may come
from other sources
+ // (e.g., manually constructed), so we handle missing columns
defensively.
+ let Some(col_stats) = stats.columns.get(col_name) else {
+ // No stats for this column, cannot prune - include the file
+ continue;
+ };
+
+ // Check if this filter can prune the file
+ if self.can_prune_by_filter(filter, col_stats) {
+ return false; // File can be pruned
+ }
+ }
+
+ true // File should be included
+ }
+
+ /// Determines if a file can be pruned based on a single filter and column
stats.
+ ///
+ /// Returns `true` if the file can definitely be pruned (no rows can
match).
+ fn can_prune_by_filter(&self, filter: &SchemableFilter, col_stats:
&ColumnStatistics) -> bool {
+ // Get the filter value as an ArrayRef
+ let filter_array = self.extract_filter_array(filter);
+ let Some(filter_value) = filter_array else {
+ return false; // Cannot extract value, don't prune
+ };
+
+ let min = &col_stats.min_value;
+ let max = &col_stats.max_value;
+
+ match filter.operator {
+ ExprOperator::Eq => {
+ // Prune if: value < min OR value > max
+ self.can_prune_eq(&filter_value, min, max)
+ }
+ ExprOperator::Ne => {
+ // Prune if: min = max = value (all rows equal the excluded
value)
+ self.can_prune_ne(&filter_value, min, max)
+ }
+ ExprOperator::Lt => {
+ // Prune if: min >= value (all values are >= the threshold)
+ self.can_prune_lt(&filter_value, min)
+ }
+ ExprOperator::Lte => {
+ // Prune if: min > value
+ self.can_prune_lte(&filter_value, min)
+ }
+ ExprOperator::Gt => {
+ // Prune if: max <= value (all values are <= the threshold)
+ self.can_prune_gt(&filter_value, max)
+ }
+ ExprOperator::Gte => {
+ // Prune if: max < value
+ self.can_prune_gte(&filter_value, max)
+ }
+ }
+ }
+
+ /// Prune for `col = value`: prune if value < min OR value > max
+ fn can_prune_eq(
+ &self,
+ value: &ArrayRef,
+ min: &Option<ArrayRef>,
+ max: &Option<ArrayRef>,
+ ) -> bool {
+ // Need both min and max to make this decision
+ let Some(min_val) = min else {
+ return false;
+ };
+ let Some(max_val) = max else {
+ return false;
+ };
+
+ // Prune if value < min OR value > max
+ let value_lt_min = cmp::lt(value, min_val).map(|r|
r.value(0)).unwrap_or(false);
+ let value_gt_max = cmp::gt(value, max_val).map(|r|
r.value(0)).unwrap_or(false);
+
+ value_lt_min || value_gt_max
+ }
+
+ /// Prune for `col != value`: prune if min = max = value
+ fn can_prune_ne(
+ &self,
+ value: &ArrayRef,
+ min: &Option<ArrayRef>,
+ max: &Option<ArrayRef>,
+ ) -> bool {
+ // Need both min and max to make this decision
+ let Some(min_val) = min else {
+ return false;
+ };
+ let Some(max_val) = max else {
+ return false;
+ };
+
+ // Prune only if min = max = value (all rows have the excluded value)
+ let min_eq_max = cmp::eq(min_val, max_val)
+ .map(|r| r.value(0))
+ .unwrap_or(false);
+ let min_eq_value = cmp::eq(min_val, value).map(|r|
r.value(0)).unwrap_or(false);
+
+ min_eq_max && min_eq_value
+ }
+
+ /// Prune for `col < value`: prune if min >= value
+ fn can_prune_lt(&self, value: &ArrayRef, min: &Option<ArrayRef>) -> bool {
+ let Some(min_val) = min else {
+ return false;
+ };
+
+ // Prune if min >= value
+ cmp::gt_eq(min_val, value)
+ .map(|r| r.value(0))
+ .unwrap_or(false)
+ }
+
+ /// Prune for `col <= value`: prune if min > value
+ fn can_prune_lte(&self, value: &ArrayRef, min: &Option<ArrayRef>) -> bool {
+ let Some(min_val) = min else {
+ return false;
+ };
+
+ // Prune if min > value
+ cmp::gt(min_val, value).map(|r| r.value(0)).unwrap_or(false)
+ }
+
+ /// Prune for `col > value`: prune if max <= value
+ fn can_prune_gt(&self, value: &ArrayRef, max: &Option<ArrayRef>) -> bool {
+ let Some(max_val) = max else {
+ return false;
+ };
+
+ // Prune if max <= value
+ cmp::lt_eq(max_val, value)
+ .map(|r| r.value(0))
+ .unwrap_or(false)
+ }
+
+ /// Prune for `col >= value`: prune if max < value
+ fn can_prune_gte(&self, value: &ArrayRef, max: &Option<ArrayRef>) -> bool {
+ let Some(max_val) = max else {
+ return false;
+ };
+
+ // Prune if max < value
+ cmp::lt(max_val, value).map(|r| r.value(0)).unwrap_or(false)
+ }
+
+ /// Extracts the filter value as an ArrayRef.
+ fn extract_filter_array(&self, filter: &SchemableFilter) ->
Option<ArrayRef> {
+ let (array, is_scalar) = filter.value.get();
+ if array.is_empty() {
+ return None;
+ }
+ // Only use scalar values or single-element arrays for min/max pruning.
+ // Multi-element arrays (e.g., IN lists) cannot be used for simple
range pruning.
+ if is_scalar || array.len() == 1 {
+ Some(array.slice(0, 1))
+ } else {
+ None
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{Int64Array, StringArray};
+ use arrow_schema::{DataType, Field};
+ use std::sync::Arc;
+
+ fn create_test_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("value", DataType::Float64, true),
+ Field::new("date", DataType::Date32, false),
+ ])
+ }
+
+ fn create_partition_schema() -> Schema {
+ Schema::new(vec![Field::new("date", DataType::Date32, false)])
+ }
+
+ fn create_stats_with_int_range(col_name: &str, min: i64, max: i64) ->
StatisticsContainer {
+ let mut stats =
StatisticsContainer::new(crate::statistics::StatsGranularity::File);
+ stats.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics {
+ column_name: col_name.to_string(),
+ data_type: DataType::Int64,
+ min_value: Some(Arc::new(Int64Array::from(vec![min])) as
ArrayRef),
+ max_value: Some(Arc::new(Int64Array::from(vec![max])) as
ArrayRef),
+ },
+ );
+ stats
+ }
+
+ fn create_stats_with_string_range(col_name: &str, min: &str, max: &str) ->
StatisticsContainer {
+ let mut stats =
StatisticsContainer::new(crate::statistics::StatsGranularity::File);
+ stats.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics {
+ column_name: col_name.to_string(),
+ data_type: DataType::Utf8,
+ min_value: Some(Arc::new(StringArray::from(vec![min])) as
ArrayRef),
+ max_value: Some(Arc::new(StringArray::from(vec![max])) as
ArrayRef),
+ },
+ );
+ stats
+ }
+
+ #[test]
+ fn test_empty_pruner() {
+ let pruner = FilePruner::empty();
+ assert!(pruner.is_empty());
+
+ let stats = create_stats_with_int_range("id", 1, 100);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_pruner_excludes_partition_columns() {
+ let table_schema = create_test_schema();
+ let partition_schema = create_partition_schema();
+
+ // Filter on partition column should be excluded
+ let filters = vec![Filter::try_from(("date", "=",
"2024-01-01")).unwrap()];
+
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+ assert!(pruner.is_empty()); // Partition column filter should be
excluded
+ }
+
+ #[test]
+ fn test_pruner_keeps_non_partition_columns() {
+ let table_schema = create_test_schema();
+ let partition_schema = create_partition_schema();
+
+ // Filter on non-partition column should be kept
+ let filters = vec![Filter::try_from(("id", ">", "50")).unwrap()];
+
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+ assert!(!pruner.is_empty());
+ }
+
+ #[test]
+ fn test_eq_filter_prunes_when_value_below_min() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "=", "5")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id = 5. Should prune (5 < 10).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(!pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_eq_filter_prunes_when_value_above_max() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "=", "200")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id = 200. Should prune (200 > 100).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(!pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_eq_filter_includes_when_value_in_range() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id = 50. Should include (10 <= 50
<= 100).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_ne_filter_prunes_when_all_equal() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "!=", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=50, max=50. Filter: id != 50. Should prune (all values
are 50).
+ let stats = create_stats_with_int_range("id", 50, 50);
+ assert!(!pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_ne_filter_includes_when_range_exists() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "!=", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id != 50. Should include (has other
values).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_lt_filter_prunes_when_min_gte_value() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "<", "10")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id < 10. Should prune (min >= 10).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(!pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_lt_filter_includes_when_min_lt_value() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "<", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id < 50. Should include (some
values < 50).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_lte_filter_prunes_when_min_gt_value() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "<=", "5")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id <= 5. Should prune (min > 5).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(!pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_gt_filter_prunes_when_max_lte_value() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", ">", "100")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id > 100. Should prune (max <= 100).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(!pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_gt_filter_includes_when_max_gt_value() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", ">", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id > 50. Should include (some
values > 50).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_gte_filter_prunes_when_max_lt_value() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", ">=", "150")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id >= 150. Should prune (max < 150).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(!pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_lte_filter_includes_when_value_in_range() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "<=", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id <= 50. Should include (some
values <= 50).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_gte_filter_includes_when_value_in_range() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", ">=", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id >= 50. Should include (some
values >= 50).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_string_filter() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("name", "=", "zebra")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min="apple", max="banana". Filter: name = "zebra". Should
prune.
+ let stats = create_stats_with_string_range("name", "apple", "banana");
+ assert!(!pruner.should_include(&stats));
+
+ // Stats: min="apple", max="zebra". Filter: name = "banana". Should
include.
+ let stats2 = create_stats_with_string_range("name", "apple", "zebra");
+ assert!(pruner.should_include(&stats2));
+ }
+
+ #[test]
+ fn test_missing_column_stats_includes_file() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats for different column - should include (cannot prune without
stats)
+ let stats = create_stats_with_int_range("other_column", 1, 10);
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_filter_on_column_with_no_stats() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Column exists but has no min/max (e.g., Parquet file with
statistics disabled)
+ let mut stats =
StatisticsContainer::new(crate::statistics::StatsGranularity::File);
+ stats.columns.insert(
+ "id".to_string(),
+ ColumnStatistics {
+ column_name: "id".to_string(),
+ data_type: DataType::Int64,
+ min_value: None,
+ max_value: None,
+ },
+ );
+
+ // Should include file (cannot prune without min/max values)
+ assert!(pruner.should_include(&stats));
+ }
+
+ #[test]
+ fn test_multiple_filters_all_must_pass() {
+ let table_schema = create_test_schema();
+ let partition_schema = Schema::empty();
+
+ let filters = vec![
+ Filter::try_from(("id", ">", "0")).unwrap(),
+ Filter::try_from(("id", "<", "5")).unwrap(),
+ ];
+ let pruner = FilePruner::new(&filters, &table_schema,
&partition_schema).unwrap();
+
+ // Stats: min=10, max=100. Filter: id > 0 AND id < 5.
+ // First filter passes (max > 0), second filter prunes (min >= 5).
+ let stats = create_stats_with_int_range("id", 10, 100);
+ assert!(!pruner.should_include(&stats));
+ }
+}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index e6581a3..223a86f 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -20,17 +20,20 @@
use std::collections::HashMap;
use std::sync::Arc;
+use arrow_schema::Schema;
+
use crate::Result;
use crate::config::HudiConfigs;
use crate::config::table::HudiTableConfig::BaseFileFormat;
use crate::file_group::FileGroup;
use crate::file_group::builder::file_groups_from_files_partition_records;
use crate::file_group::file_slice::FileSlice;
+use crate::metadata::table::records::FilesPartitionRecord;
use crate::storage::Storage;
use crate::table::Table;
+use crate::table::file_pruner::FilePruner;
use crate::table::listing::FileLister;
use crate::table::partition::PartitionPruner;
-use crate::timeline::completion_time::CompletionTimeView;
use crate::timeline::view::TimelineView;
use dashmap::DashMap;
@@ -58,64 +61,152 @@ impl FileSystemView {
})
}
- /// Load file groups by listing from the storage.
+ /// Load file groups from the appropriate source (storage or metadata
table records)
+ /// and apply stats-based pruning.
+ ///
+ /// # File Listing Source
+ /// - If `files_partition_records` is Some: Uses pre-fetched metadata
table records
+ /// - If `files_partition_records` is None: Uses storage listing via
FileLister
+ ///
+ /// # Stats Pruning Source (for non-empty file_pruner)
+ /// - Currently: Always extracts stats from Parquet file footers
+ /// - TODO: Use metadata table partitions when available:
+ /// - partition_stats: Enhance PartitionPruner to prune partitions
before file listing
+ /// - column_stats: Prune files without reading Parquet footers
///
/// # Arguments
/// * `partition_pruner` - Filters which partitions to include
- /// * `completion_time_view` - View to look up completion timestamps
- async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
+ /// * `file_pruner` - Filters files based on column statistics
+ /// * `table_schema` - Table schema for statistics extraction
+ /// * `timeline_view` - The timeline view providing query timestamp and
completion time lookups
+ /// * `files_partition_records` - Optional pre-fetched metadata table
records
+ async fn load_file_groups(
&self,
partition_pruner: &PartitionPruner,
- completion_time_view: &V,
+ file_pruner: &FilePruner,
+ table_schema: &Schema,
+ timeline_view: &TimelineView,
+ files_partition_records: Option<&HashMap<String,
FilesPartitionRecord>>,
) -> Result<()> {
- let lister = FileLister::new(
- self.hudi_configs.clone(),
- self.storage.clone(),
- partition_pruner.to_owned(),
- );
- let file_groups_map = lister
- .list_file_groups_for_relevant_partitions(completion_time_view)
- .await?;
+ // TODO: Enhance PartitionPruner with partition_stats support
+ // - Load partition_stats from metadata table into PartitionPruner
+ // - PartitionPruner.should_include() will use both partition column
values AND partition_stats
+ // - For non-partitioned tables: check
partition_pruner.can_any_partition_match() for early return
+
+ // Step 1: Get file groups from appropriate source
+ let file_groups_map = if let Some(records) = files_partition_records {
+ // Use pre-fetched metadata table records
+ let base_file_format: String =
self.hudi_configs.get_or_default(BaseFileFormat).into();
+ file_groups_from_files_partition_records(records,
&base_file_format, timeline_view)?
+ } else {
+ // Use storage listing
+ let lister = FileLister::new(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ partition_pruner.to_owned(),
+ );
+ lister
+ .list_file_groups_for_relevant_partitions(timeline_view)
+ .await?
+ };
+
+ // Step 2: Apply partition pruning (for metadata table path) and stats
pruning
+ // Note: Storage listing path already applies partition pruning via
FileLister
+ // TODO: Check if metadata table column_stats partition is available
+ // and use that instead of Parquet footers for better performance
for (partition_path, file_groups) in file_groups_map {
+ // Skip partitions that don't match the pruner (for metadata table
path)
+ if files_partition_records.is_some()
+ && !partition_pruner.is_empty()
+ && !partition_pruner.should_include(&partition_path)
+ {
+ continue;
+ }
+
+ let retained = self
+ .apply_stats_pruning_from_footers(
+ file_groups,
+ file_pruner,
+ table_schema,
+ timeline_view.as_of_timestamp(),
+ )
+ .await;
self.partition_to_file_groups
- .insert(partition_path, file_groups);
+ .insert(partition_path, retained);
}
+
Ok(())
}
- /// Load file groups from metadata table records.
- ///
- /// This is an alternative to `load_file_groups_from_file_system` that uses
- /// file listing records fetched from the metadata table. Only partitions
that
- /// pass the partition pruner will be loaded.
+ /// Apply file-level stats pruning using Parquet file footers.
///
- /// This method is not async because it operates on pre-fetched records.
- ///
- /// # Arguments
- /// * `records` - Metadata table files partition records
- /// * `partition_pruner` - Filters which partitions to include
- /// * `completion_time_view` - View to look up completion timestamps
- fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
+ /// Returns the filtered list of file groups that pass the pruning check.
+ /// Files are included (not pruned) if:
+ /// - The pruner has no filters
+ /// - The file is not a Parquet file
+ /// - Column stats cannot be loaded (conservative behavior)
+ /// - The file's stats indicate it might contain matching rows
+ async fn apply_stats_pruning_from_footers(
&self,
- records: &HashMap<String,
crate::metadata::table_record::FilesPartitionRecord>,
- partition_pruner: &PartitionPruner,
- completion_time_view: &V,
- ) -> Result<()> {
- let base_file_format: String =
self.hudi_configs.get_or_default(BaseFileFormat).into();
- let file_groups_map = file_groups_from_files_partition_records(
- records,
- &base_file_format,
- completion_time_view,
- )?;
-
- for entry in file_groups_map.iter() {
- let partition_path = entry.key();
- if partition_pruner.is_empty() ||
partition_pruner.should_include(partition_path) {
- self.partition_to_file_groups
- .insert(partition_path.clone(), entry.value().clone());
+ file_groups: Vec<FileGroup>,
+ file_pruner: &FilePruner,
+ table_schema: &Schema,
+ as_of_timestamp: &str,
+ ) -> Vec<FileGroup> {
+ if file_pruner.is_empty() {
+ return file_groups;
+ }
+
+ let mut retained = Vec::with_capacity(file_groups.len());
+
+ for mut fg in file_groups {
+ if let Some(fsl) = fg.get_file_slice_mut_as_of(as_of_timestamp) {
+ let relative_path = match fsl.base_file_relative_path() {
+ Ok(path) => path,
+ Err(e) => {
+ log::warn!(
+ "Cannot get base file path for pruning: {e}.
Including file group."
+ );
+ retained.push(fg);
+ continue;
+ }
+ };
+
+ // Case-insensitive check for .parquet extension
+ if !relative_path.to_lowercase().ends_with(".parquet") {
+ retained.push(fg);
+ continue;
+ }
+
+ // Load column stats from Parquet footer
+ let stats = match self
+ .storage
+ .get_parquet_column_stats(&relative_path, table_schema)
+ .await
+ {
+ Ok(s) => s,
+ Err(e) => {
+ log::warn!(
+ "Failed to load column stats for {relative_path}:
{e}. Including file."
+ );
+ retained.push(fg);
+ continue;
+ }
+ };
+
+ if file_pruner.should_include(&stats) {
+ retained.push(fg);
+ } else {
+ log::debug!("Pruned file {relative_path} based on column
stats");
+ }
+ } else {
+ // No file slice as of timestamp, include the file group
+ // (it will be filtered out later in collect_file_slices)
+ retained.push(fg);
}
}
- Ok(())
+
+ retained
}
/// Collect file slices from loaded file groups using the timeline view.
@@ -159,27 +250,34 @@ impl FileSystemView {
///
/// # Arguments
/// * `partition_pruner` - Filters which partitions to include
+ /// * `file_pruner` - Filters files based on column statistics
+ /// * `table_schema` - Table schema for statistics extraction
/// * `timeline_view` - The timeline view containing query context
/// * `metadata_table` - Optional metadata table instance
pub(crate) async fn get_file_slices(
&self,
partition_pruner: &PartitionPruner,
+ file_pruner: &FilePruner,
+ table_schema: &Schema,
timeline_view: &TimelineView,
metadata_table: Option<&Table>,
) -> Result<Vec<FileSlice>> {
- if let Some(mdt) = metadata_table {
- // Use metadata table for file listing
- let records =
mdt.fetch_files_partition_records(partition_pruner).await?;
- self.load_file_groups_from_metadata_table_records(
- &records,
- partition_pruner,
- timeline_view,
- )?;
+ // Fetch records from metadata table if available
+ let files_partition_records = if let Some(mdt) = metadata_table {
+ Some(mdt.fetch_files_partition_records(partition_pruner).await?)
} else {
- // Fall back to storage listing
- self.load_file_groups_from_storage(partition_pruner, timeline_view)
- .await?;
- }
+ None
+ };
+
+ self.load_file_groups(
+ partition_pruner,
+ file_pruner,
+ table_schema,
+ timeline_view,
+ files_partition_records.as_ref(),
+ )
+ .await?;
+
self.collect_file_slices(partition_pruner, timeline_view)
.await
}
@@ -191,14 +289,26 @@ impl FileSystemView {
///
/// # Arguments
/// * `partition_pruner` - Filters which partitions to include
+ /// * `file_pruner` - Filters files based on column statistics
+ /// * `table_schema` - Table schema for statistics extraction
/// * `timeline_view` - The timeline view containing query context
pub(crate) async fn get_file_slices_by_storage_listing(
&self,
partition_pruner: &PartitionPruner,
+ file_pruner: &FilePruner,
+ table_schema: &Schema,
timeline_view: &TimelineView,
) -> Result<Vec<FileSlice>> {
- self.load_file_groups_from_storage(partition_pruner, timeline_view)
- .await?;
+ // Pass None to force storage listing (avoids recursion for metadata
table)
+ self.load_file_groups(
+ partition_pruner,
+ file_pruner,
+ table_schema,
+ timeline_view,
+ None,
+ )
+ .await?;
+
self.collect_file_slices(partition_pruner, timeline_view)
.await
}
@@ -227,9 +337,17 @@ mod tests {
.await
.unwrap();
let partition_pruner = PartitionPruner::empty();
+ let file_pruner = FilePruner::empty();
+ let table_schema = hudi_table.get_schema().await.unwrap();
let file_slices = fs_view
- .get_file_slices(&partition_pruner, &timeline_view, None)
+ .get_file_slices(
+ &partition_pruner,
+ &file_pruner,
+ &table_schema,
+ &timeline_view,
+ None,
+ )
.await
.unwrap();
@@ -260,9 +378,17 @@ mod tests {
.await
.unwrap();
let partition_pruner = PartitionPruner::empty();
+ let file_pruner = FilePruner::empty();
+ let table_schema = hudi_table.get_schema().await.unwrap();
let file_slices = fs_view
- .get_file_slices(&partition_pruner, &timeline_view, None)
+ .get_file_slices(
+ &partition_pruner,
+ &file_pruner,
+ &table_schema,
+ &timeline_view,
+ None,
+ )
.await
.unwrap();
@@ -293,6 +419,7 @@ mod tests {
.await
.unwrap();
let partition_schema =
hudi_table.get_partition_schema().await.unwrap();
+ let table_schema = hudi_table.get_schema().await.unwrap();
let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
let filter_eq_300 = Filter::try_from(("shortField", "=",
"300")).unwrap();
@@ -303,8 +430,16 @@ mod tests {
)
.unwrap();
+ let file_pruner = FilePruner::empty();
+
let file_slices = fs_view
- .get_file_slices(&partition_pruner, &timeline_view, None)
+ .get_file_slices(
+ &partition_pruner,
+ &file_pruner,
+ &table_schema,
+ &timeline_view,
+ None,
+ )
.await
.unwrap();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index f02fd3e..153105e 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -88,6 +88,7 @@
//! ```
pub mod builder;
+pub mod file_pruner;
mod fs_view;
mod listing;
pub mod partition;
@@ -107,6 +108,7 @@ use crate::file_group::reader::FileGroupReader;
use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
use crate::table::builder::TableBuilder;
+use crate::table::file_pruner::FilePruner;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::timeline::util::format_timestamp;
@@ -481,6 +483,12 @@ impl Table {
let partition_pruner =
PartitionPruner::new(filters, &partition_schema,
self.hudi_configs.as_ref())?;
+ // Get table schema for file pruning
+ let table_schema = self.get_schema().await?;
+
+ // Create file pruner with filters on non-partition columns
+ let file_pruner = FilePruner::new(filters, &table_schema,
&partition_schema)?;
+
// Try to create metadata table instance if enabled
let metadata_table = if self.is_metadata_table_enabled() {
log::debug!("Using metadata table for file listing");
@@ -498,7 +506,13 @@ impl Table {
};
self.file_system_view
- .get_file_slices(&partition_pruner, &timeline_view,
metadata_table.as_ref())
+ .get_file_slices(
+ &partition_pruner,
+ &file_pruner,
+ &table_schema,
+ &timeline_view,
+ metadata_table.as_ref(),
+ )
.await
}
diff --git a/crates/core/tests/statistics_tests.rs
b/crates/core/tests/statistics_tests.rs
new file mode 100644
index 0000000..0186b2a
--- /dev/null
+++ b/crates/core/tests/statistics_tests.rs
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Integration tests for the statistics module.
+//!
+//! These tests generate Parquet files with known data, then verify that
+//! the statistics extraction correctly reads min/max values.
+
+use std::fs::File;
+use std::sync::Arc;
+
+use arrow_array::{
+ ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array,
Int8Array, Int16Array,
+ Int32Array, Int64Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt32Array,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::FileReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use tempfile::tempdir;
+
+use hudi_core::statistics::{StatisticsContainer, StatsGranularity};
+
+/// Helper to write a RecordBatch to a Parquet file and return the path.
+fn write_parquet_file(batch: &RecordBatch, path: &std::path::Path) {
+ let file = File::create(path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+ .build();
+ let mut writer = ArrowWriter::try_new(file, batch.schema(),
Some(props)).unwrap();
+ writer.write(batch).unwrap();
+ writer.close().unwrap();
+}
+
+/// Helper to write a RecordBatch to a Parquet file without statistics.
+fn write_parquet_file_no_stats(batch: &RecordBatch, path: &std::path::Path) {
+ let file = File::create(path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
+ .build();
+ let mut writer = ArrowWriter::try_new(file, batch.schema(),
Some(props)).unwrap();
+ writer.write(batch).unwrap();
+ writer.close().unwrap();
+}
+
+/// Helper to write multiple RecordBatches to a single Parquet file (multiple
row groups).
+fn write_parquet_file_multiple_row_groups(batches: &[RecordBatch], path:
&std::path::Path) {
+ let file = File::create(path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+ .set_max_row_group_size(3) // Force smaller row groups
+ .build();
+ let mut writer = ArrowWriter::try_new(file, batches[0].schema(),
Some(props)).unwrap();
+ for batch in batches {
+ writer.write(batch).unwrap();
+ }
+ writer.close().unwrap();
+}
+
+/// Helper to read Parquet metadata from a file.
+fn read_parquet_metadata(path: &std::path::Path) ->
parquet::file::metadata::ParquetMetaData {
+ let file = File::open(path).unwrap();
+ let reader = SerializedFileReader::new(file).unwrap();
+ reader.metadata().clone()
+}
+
+/// Helper to extract Int32 value from ArrayRef
+fn get_int32(arr: &ArrayRef) -> i32 {
+ arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int64 value from ArrayRef
+fn get_int64(arr: &ArrayRef) -> i64 {
+ arr.as_any().downcast_ref::<Int64Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int8 value from ArrayRef
+fn get_int8(arr: &ArrayRef) -> i8 {
+ arr.as_any().downcast_ref::<Int8Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int16 value from ArrayRef
+fn get_int16(arr: &ArrayRef) -> i16 {
+ arr.as_any().downcast_ref::<Int16Array>().unwrap().value(0)
+}
+
+/// Helper to extract UInt32 value from ArrayRef
+fn get_uint32(arr: &ArrayRef) -> u32 {
+ arr.as_any().downcast_ref::<UInt32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Float32 value from ArrayRef
+fn get_float32(arr: &ArrayRef) -> f32 {
+ arr.as_any()
+ .downcast_ref::<Float32Array>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract Float64 value from ArrayRef
+fn get_float64(arr: &ArrayRef) -> f64 {
+ arr.as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract Boolean value from ArrayRef
+fn get_bool(arr: &ArrayRef) -> bool {
+ arr.as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract String value from ArrayRef
+fn get_string(arr: &ArrayRef) -> String {
+ arr.as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .value(0)
+ .to_string()
+}
+
+/// Helper to extract Date32 value from ArrayRef
+fn get_date32(arr: &ArrayRef) -> i32 {
+ arr.as_any().downcast_ref::<Date32Array>().unwrap().value(0)
+}
+
+/// Helper to extract TimestampMicrosecond value from ArrayRef
+fn get_timestamp_micros(arr: &ArrayRef) -> i64 {
+ arr.as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract TimestampSecond value from ArrayRef
+fn get_timestamp_seconds(arr: &ArrayRef) -> i64 {
+ arr.as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract TimestampMillisecond value from ArrayRef
+fn get_timestamp_millis(arr: &ArrayRef) -> i64 {
+ arr.as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract TimestampNanosecond value from ArrayRef
+fn get_timestamp_nanos(arr: &ArrayRef) -> i64 {
+ arr.as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap()
+ .value(0)
+}
+
+mod file_level_statistics {
+ use super::*;
+
+ #[test]
+ fn test_integer_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("integer_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("int32_col", DataType::Int32, false),
+ Field::new("int64_col", DataType::Int64, false),
+ Field::new("uint32_col", DataType::UInt32, false),
+ ]));
+ let int32_values = Int32Array::from(vec![10, 50, 30, 20, 40]);
+ let int64_values = Int64Array::from(vec![100_i64, 500, 300, 200, 400]);
+ let uint32_values = UInt32Array::from(vec![100_u32, 500, 300, 200,
400]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(int32_values) as ArrayRef,
+ Arc::new(int64_values) as ArrayRef,
+ Arc::new(uint32_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ assert_eq!(stats.granularity, StatsGranularity::File);
+ assert_eq!(stats.num_rows, Some(5));
+
+ let int32_stats = stats.columns.get("int32_col").unwrap();
+ assert_eq!(get_int32(int32_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(int32_stats.max_value.as_ref().unwrap()), 50);
+
+ let int64_stats = stats.columns.get("int64_col").unwrap();
+ assert_eq!(get_int64(int64_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_int64(int64_stats.max_value.as_ref().unwrap()), 500);
+
+ let uint32_stats = stats.columns.get("uint32_col").unwrap();
+ assert_eq!(get_uint32(uint32_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_uint32(uint32_stats.max_value.as_ref().unwrap()), 500);
+ }
+
+ #[test]
+ fn test_int8_and_int16_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("small_int_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("int8_col", DataType::Int8, false),
+ Field::new("int16_col", DataType::Int16, false),
+ ]));
+ let int8_values = Int8Array::from(vec![-10_i8, 50, 30, -20, 40]);
+ let int16_values = Int16Array::from(vec![100_i16, 500, -300, 200,
400]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(int8_values) as ArrayRef,
+ Arc::new(int16_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let int8_stats = stats.columns.get("int8_col").unwrap();
+ assert_eq!(get_int8(int8_stats.min_value.as_ref().unwrap()), -20);
+ assert_eq!(get_int8(int8_stats.max_value.as_ref().unwrap()), 50);
+
+ let int16_stats = stats.columns.get("int16_col").unwrap();
+ assert_eq!(get_int16(int16_stats.min_value.as_ref().unwrap()), -300);
+ assert_eq!(get_int16(int16_stats.max_value.as_ref().unwrap()), 500);
+ }
+
+ #[test]
+ fn test_float_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("float_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("float32_col", DataType::Float32, false),
+ Field::new("float64_col", DataType::Float64, false),
+ ]));
+ let float32_values = Float32Array::from(vec![1.5_f32, 3.5, 2.5, 0.5,
4.5]);
+ let float64_values = Float64Array::from(vec![10.5_f64, 30.5, 20.5,
5.5, 40.5]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(float32_values) as ArrayRef,
+ Arc::new(float64_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let float32_stats = stats.columns.get("float32_col").unwrap();
+ assert_eq!(get_float32(float32_stats.min_value.as_ref().unwrap()),
0.5);
+ assert_eq!(get_float32(float32_stats.max_value.as_ref().unwrap()),
4.5);
+
+ let float64_stats = stats.columns.get("float64_col").unwrap();
+ assert_eq!(get_float64(float64_stats.min_value.as_ref().unwrap()),
5.5);
+ assert_eq!(get_float64(float64_stats.max_value.as_ref().unwrap()),
40.5);
+ }
+
+ #[test]
+ fn test_string_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("string_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new("name",
DataType::Utf8, false)]));
+ let names = StringArray::from(vec!["charlie", "alice", "bob", "diana",
"eve"]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(names) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("name").unwrap();
+ assert_eq!(get_string(col_stats.min_value.as_ref().unwrap()), "alice");
+ assert_eq!(get_string(col_stats.max_value.as_ref().unwrap()), "eve");
+ }
+
+ #[test]
+ fn test_boolean_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("bool_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "is_active",
+ DataType::Boolean,
+ false,
+ )]));
+ let values = BooleanArray::from(vec![true, false, true, false, true]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("is_active").unwrap();
+ assert!(!get_bool(col_stats.min_value.as_ref().unwrap()));
+ assert!(get_bool(col_stats.max_value.as_ref().unwrap()));
+ }
+
+ #[test]
+ fn test_date32_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("date32_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "event_date",
+ DataType::Date32,
+ false,
+ )]));
+ // Days since epoch: 19000 = ~2022-01-01, 19100 = ~2022-04-11, etc.
+ let values = Date32Array::from(vec![19000, 19100, 19050, 18900,
19200]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("event_date").unwrap();
+ assert_eq!(get_date32(col_stats.min_value.as_ref().unwrap()), 18900);
+ assert_eq!(get_date32(col_stats.max_value.as_ref().unwrap()), 19200);
+ }
+
+ #[test]
+ fn test_timestamp_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("timestamp_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "event_time",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ )]));
+ // Microseconds since epoch
+ let values = TimestampMicrosecondArray::from(vec![
+ 1_000_000_i64,
+ 5_000_000,
+ 3_000_000,
+ 2_000_000,
+ 4_000_000,
+ ]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("event_time").unwrap();
+ assert_eq!(
+ get_timestamp_micros(col_stats.min_value.as_ref().unwrap()),
+ 1_000_000
+ );
+ assert_eq!(
+ get_timestamp_micros(col_stats.max_value.as_ref().unwrap()),
+ 5_000_000
+ );
+ }
+
+ #[test]
+ fn test_timestamp_all_time_units() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path =
temp_dir.path().join("timestamp_units_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("ts_sec", DataType::Timestamp(TimeUnit::Second, None),
false),
+ Field::new(
+ "ts_milli",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ ),
+ Field::new(
+ "ts_micro",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ Field::new(
+ "ts_nano",
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ false,
+ ),
+ ]));
+
+ let ts_sec = TimestampSecondArray::from(vec![100_i64, 500, 300, 200,
400]);
+ let ts_milli = TimestampMillisecondArray::from(vec![1000_i64, 5000,
3000, 2000, 4000]);
+ let ts_micro = TimestampMicrosecondArray::from(vec![10000_i64, 50000,
30000, 20000, 40000]);
+ let ts_nano =
+ TimestampNanosecondArray::from(vec![100000_i64, 500000, 300000,
200000, 400000]);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(ts_sec) as ArrayRef,
+ Arc::new(ts_milli) as ArrayRef,
+ Arc::new(ts_micro) as ArrayRef,
+ Arc::new(ts_nano) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // Verify seconds
+ let col_stats = stats.columns.get("ts_sec").unwrap();
+ assert_eq!(
+ get_timestamp_seconds(col_stats.min_value.as_ref().unwrap()),
+ 100
+ );
+ assert_eq!(
+ get_timestamp_seconds(col_stats.max_value.as_ref().unwrap()),
+ 500
+ );
+
+ // Verify milliseconds
+ let col_stats = stats.columns.get("ts_milli").unwrap();
+ assert_eq!(
+ get_timestamp_millis(col_stats.min_value.as_ref().unwrap()),
+ 1000
+ );
+ assert_eq!(
+ get_timestamp_millis(col_stats.max_value.as_ref().unwrap()),
+ 5000
+ );
+
+ // Verify microseconds
+ let col_stats = stats.columns.get("ts_micro").unwrap();
+ assert_eq!(
+ get_timestamp_micros(col_stats.min_value.as_ref().unwrap()),
+ 10000
+ );
+ assert_eq!(
+ get_timestamp_micros(col_stats.max_value.as_ref().unwrap()),
+ 50000
+ );
+
+ // Verify nanoseconds
+ let col_stats = stats.columns.get("ts_nano").unwrap();
+ assert_eq!(
+ get_timestamp_nanos(col_stats.min_value.as_ref().unwrap()),
+ 100000
+ );
+ assert_eq!(
+ get_timestamp_nanos(col_stats.max_value.as_ref().unwrap()),
+ 500000
+ );
+ }
+}
+
+mod null_handling {
+ use super::*;
+
+ #[test]
+ fn test_nullable_columns_with_nulls() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("nullable_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("int_value", DataType::Int32, true),
+ Field::new("name", DataType::Utf8, true),
+ ]));
+ let int_values = Int32Array::from(vec![Some(10), None, Some(30), None,
Some(20)]);
+ let names = StringArray::from(vec![
+ Some("alice"),
+ None,
+ Some("charlie"),
+ None,
+ Some("bob"),
+ ]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(int_values) as ArrayRef,
+ Arc::new(names) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // Verify Int32 column stats ignore nulls
+ let int_stats = stats.columns.get("int_value").unwrap();
+ assert_eq!(get_int32(int_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(int_stats.max_value.as_ref().unwrap()), 30);
+
+ // Verify String column stats ignore nulls
+ let name_stats = stats.columns.get("name").unwrap();
+ assert_eq!(get_string(name_stats.min_value.as_ref().unwrap()),
"alice");
+ assert_eq!(
+ get_string(name_stats.max_value.as_ref().unwrap()),
+ "charlie"
+ );
+ }
+
+ #[test]
+ fn test_all_nulls_column() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("all_nulls_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ true,
+ )]));
+ let values: Int32Array = vec![None, None, None].into_iter().collect();
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("value").unwrap();
+ // For all nulls, min/max should be None
+ assert!(col_stats.min_value.is_none());
+ assert!(col_stats.max_value.is_none());
+ }
+}
+
+mod row_group_statistics {
+ use super::*;
+
+ #[test]
+ fn test_single_row_group_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("single_rg_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("value", DataType::Float64, false),
+ ]));
+ let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
+ let values = Float64Array::from(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(ids) as ArrayRef, Arc::new(values) as ArrayRef],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let row_group = &metadata.row_groups()[0];
+ let rg_stats = StatisticsContainer::from_row_group(row_group, &schema);
+
+ assert_eq!(rg_stats.granularity, StatsGranularity::RowGroup);
+ assert_eq!(rg_stats.num_rows, Some(5));
+
+ let id_stats = rg_stats.columns.get("id").unwrap();
+ assert_eq!(get_int32(id_stats.min_value.as_ref().unwrap()), 1);
+ assert_eq!(get_int32(id_stats.max_value.as_ref().unwrap()), 5);
+
+ let value_stats = rg_stats.columns.get("value").unwrap();
+ assert_eq!(get_float64(value_stats.min_value.as_ref().unwrap()), 10.0);
+ assert_eq!(get_float64(value_stats.max_value.as_ref().unwrap()), 50.0);
+ }
+
+ #[test]
+ fn test_multiple_row_groups_aggregation() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("multi_rg_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ true,
+ )]));
+
+ // Create batches that will span multiple row groups
+ // Batch 1: values 100-110
+ let batch1 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![100, 105, 110])) as ArrayRef],
+ )
+ .unwrap();
+
+ // Batch 2: values 1-10 (includes global min)
+ let batch2 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![1, 5, 10])) as ArrayRef],
+ )
+ .unwrap();
+
+ // Batch 3: values 200-220 (includes global max)
+ let batch3 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![200, 210, 220])) as ArrayRef],
+ )
+ .unwrap();
+
+ write_parquet_file_multiple_row_groups(&[batch1, batch2, batch3],
&parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+
+ // Verify we have multiple row groups
+ assert!(
+ metadata.num_row_groups() >= 2,
+ "Expected multiple row groups"
+ );
+
+ // Get file-level stats (aggregated from row groups)
+ let file_stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ assert_eq!(file_stats.granularity, StatsGranularity::File);
+ assert_eq!(file_stats.num_rows, Some(9)); // 3 + 3 + 3
+
+ let col_stats = file_stats.columns.get("value").unwrap();
+ // File-level min should be global min across all row groups
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 1);
+ // File-level max should be global max across all row groups
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 220);
+ }
+
+ #[test]
+ fn test_row_group_with_nulls_aggregation() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("rg_nulls_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ true,
+ )]));
+
+ // Batch with some nulls
+ let batch1 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![Some(10), None, Some(20)])) as
ArrayRef],
+ )
+ .unwrap();
+
+ // Batch with more nulls
+ let batch2 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![None, Some(30), None])) as
ArrayRef],
+ )
+ .unwrap();
+
+ write_parquet_file_multiple_row_groups(&[batch1, batch2],
&parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let file_stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = file_stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 30);
+ }
+}
+
+mod multiple_columns {
+ use super::*;
+
+ #[test]
+ fn test_mixed_type_columns() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("mixed_types_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("score", DataType::Float64, true),
+ Field::new("is_active", DataType::Boolean, false),
+ ]));
+
+ let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
+ let names = StringArray::from(vec!["alice", "bob", "charlie", "diana",
"eve"]);
+ let scores = Float64Array::from(vec![Some(85.5), Some(92.0), None,
Some(78.5), Some(95.0)]);
+ let is_active = BooleanArray::from(vec![true, false, true, true,
false]);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(ids) as ArrayRef,
+ Arc::new(names) as ArrayRef,
+ Arc::new(scores) as ArrayRef,
+ Arc::new(is_active) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // Verify all columns have statistics
+ assert_eq!(stats.columns.len(), 4);
+
+ // Check int32 column
+ let id_stats = stats.columns.get("id").unwrap();
+ assert_eq!(get_int32(id_stats.min_value.as_ref().unwrap()), 1);
+ assert_eq!(get_int32(id_stats.max_value.as_ref().unwrap()), 5);
+
+ // Check string column
+ let name_stats = stats.columns.get("name").unwrap();
+ assert_eq!(get_string(name_stats.min_value.as_ref().unwrap()),
"alice");
+ assert_eq!(get_string(name_stats.max_value.as_ref().unwrap()), "eve");
+
+ // Check float64 column with null
+ let score_stats = stats.columns.get("score").unwrap();
+ assert_eq!(get_float64(score_stats.min_value.as_ref().unwrap()), 78.5);
+ assert_eq!(get_float64(score_stats.max_value.as_ref().unwrap()), 95.0);
+
+ // Check boolean column
+ let active_stats = stats.columns.get("is_active").unwrap();
+ assert!(!get_bool(active_stats.min_value.as_ref().unwrap()));
+ assert!(get_bool(active_stats.max_value.as_ref().unwrap()));
+ }
+
+ #[test]
+ fn test_schema_column_ordering() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("ordering_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("z_col", DataType::Int32, false),
+ Field::new("a_col", DataType::Int32, false),
+ Field::new("m_col", DataType::Int32, false),
+ ]));
+
+ let z_values = Int32Array::from(vec![1, 2, 3]);
+ let a_values = Int32Array::from(vec![10, 20, 30]);
+ let m_values = Int32Array::from(vec![100, 200, 300]);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(z_values) as ArrayRef,
+ Arc::new(a_values) as ArrayRef,
+ Arc::new(m_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // All columns should have stats regardless of their name ordering
+ assert!(stats.columns.contains_key("z_col"));
+ assert!(stats.columns.contains_key("a_col"));
+ assert!(stats.columns.contains_key("m_col"));
+
+ // Verify values
+ let z_stats = stats.columns.get("z_col").unwrap();
+ assert_eq!(get_int32(z_stats.min_value.as_ref().unwrap()), 1);
+ assert_eq!(get_int32(z_stats.max_value.as_ref().unwrap()), 3);
+
+ let a_stats = stats.columns.get("a_col").unwrap();
+ assert_eq!(get_int32(a_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(a_stats.max_value.as_ref().unwrap()), 30);
+
+ let m_stats = stats.columns.get("m_col").unwrap();
+ assert_eq!(get_int32(m_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_int32(m_stats.max_value.as_ref().unwrap()), 300);
+ }
+}
+
+mod edge_cases {
+ use super::*;
+
+ #[test]
+ fn test_min_equals_max() {
+ let temp_dir = tempdir().unwrap();
+
+ // Test single row case
+ let single_row_path = temp_dir.path().join("single_row_test.parquet");
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ false,
+ )]));
+ let values = Int32Array::from(vec![42]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+ write_parquet_file(&batch, &single_row_path);
+
+ let metadata = read_parquet_metadata(&single_row_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+ assert_eq!(stats.num_rows, Some(1));
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 42);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 42);
+
+ // Test identical values case
+ let identical_path = temp_dir.path().join("identical_test.parquet");
+ let values = Int32Array::from(vec![100, 100, 100, 100, 100]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+ write_parquet_file(&batch, &identical_path);
+
+ let metadata = read_parquet_metadata(&identical_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 100);
+ }
+
+ #[test]
+ fn test_negative_numbers() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("negative_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ false,
+ )]));
+ let values = Int32Array::from(vec![-100, -50, 0, 50, 100]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), -100);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 100);
+ }
+
+ #[test]
+ fn test_empty_string() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("empty_string_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new("name",
DataType::Utf8, false)]));
+ let names = StringArray::from(vec!["", "apple", "banana"]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(names) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("name").unwrap();
+ // Empty string should be the minimum (lexicographically smallest)
+ assert_eq!(get_string(col_stats.min_value.as_ref().unwrap()), "");
+ assert_eq!(get_string(col_stats.max_value.as_ref().unwrap()),
"banana");
+ }
+
+ #[test]
+ fn test_special_float_values() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("special_float_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Float64,
+ false,
+ )]));
+ // Test with various float values including very small and very large
+ let values = Float64Array::from(vec![
+ f64::MIN_POSITIVE,
+ 1.0,
+ f64::MAX / 2.0,
+ -f64::MAX / 2.0,
+ 0.0,
+ ]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(
+ get_float64(col_stats.min_value.as_ref().unwrap()),
+ -f64::MAX / 2.0
+ );
+ assert_eq!(
+ get_float64(col_stats.max_value.as_ref().unwrap()),
+ f64::MAX / 2.0
+ );
+ }
+
+ #[test]
+ fn test_parquet_without_statistics() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("no_stats_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ false,
+ )]));
+ let values = Int32Array::from(vec![10, 50, 30, 20, 40]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file_no_stats(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // When statistics are disabled, min/max should be None
+ let col_stats = stats.columns.get("value").unwrap();
+ assert!(col_stats.min_value.is_none());
+ assert!(col_stats.max_value.is_none());
+ }
+}
diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml
index 1f129d3..114f013 100644
--- a/crates/test/Cargo.toml
+++ b/crates/test/Cargo.toml
@@ -38,5 +38,5 @@ strum_macros = { workspace = true }
url = { workspace = true }
# testing
-tempfile = "3.20.0"
-zip-extract = "0.3.0"
+tempfile = { workspace = true }
+zip-extract = { workspace = true }