Copilot commented on code in PR #516:
URL: https://github.com/apache/hudi-rs/pull/516#discussion_r2680738563
##########
crates/core/src/metadata/table/mod.rs:
##########
@@ -250,9 +253,19 @@ impl Table {
let partition_pruner =
PartitionPruner::new(&filters, &partition_schema,
self.hudi_configs.as_ref())?;
+ // Use empty file pruner for metadata table - no column stats pruning
needed
+ // Use empty schema since the pruner is empty and won't use the schema
+ let file_pruner = FilePruner::empty();
+ let table_schema = Schema::empty();
Review Comment:
Consider documenting why an empty file pruner and empty schema are used for
metadata table file listing. While this makes sense (metadata table reads don't
need column stats pruning), adding a comment would clarify the design decision.
For example:
```rust
// Metadata table doesn't need column stats pruning since it's an internal
index table
// that needs to be read fully for file listing operations
let file_pruner = FilePruner::empty();
let table_schema = Schema::empty();
```
##########
crates/core/src/statistics/mod.rs:
##########
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Statistics module for column statistics pruning.
+//!
+//! This module provides abstractions for extracting, aggregating, and using
+//! column statistics from Parquet files for query pruning at different
granularity levels.
+//!
+//! Core types:
+//! - [`ColumnStatistics`]: Per-column statistics (min, max) for range-based
pruning
+//! - [`StatisticsContainer`]: Container for all column statistics at a given
granularity
+//!
+//! Min/max values are stored as single-element Arrow arrays (`ArrayRef`),
enabling
+//! direct comparison using `arrow_ord::cmp` functions without custom enum
types.
+
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow_array::{
+ ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array,
Float64Array, Int8Array,
+ Int16Array, Int32Array, Int64Array, StringArray,
TimestampMicrosecondArray, UInt8Array,
+ UInt16Array, UInt32Array, UInt64Array,
+};
+use arrow_ord::cmp;
+use arrow_schema::{DataType, Schema};
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+
+/// Column statistics pruning granularity level.
+///
+/// Controls how fine-grained the statistics-based pruning is.
+/// Each level offers different trade-offs between memory overhead and pruning
effectiveness.
+///
+/// String parsing is case-insensitive and accepts "row_group" or "rowgroup"
for RowGroup.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
+pub enum StatsGranularity {
+ /// File-level stats (aggregated from all row groups).
+ ///
+ /// Coarsest granularity, lowest memory overhead.
+ /// Can skip entire files that don't match predicates.
+ /// Stats are computed by aggregating row group stats:
+ /// - `file_min = min(row_group_mins)`
+ /// - `file_max = max(row_group_maxs)`
+ #[default]
+ File,
+
+ /// Row group level stats (directly from Parquet footer).
+ ///
+ /// Balanced granularity, moderate memory.
+ /// Can skip row groups within files.
+ /// Stats are read directly from Parquet footer's ColumnChunkMetaData.
+ RowGroup,
+}
+
+impl FromStr for StatsGranularity {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "file" => Ok(Self::File),
+ "row_group" | "rowgroup" => Ok(Self::RowGroup),
+ _ => Err(format!(
+ "Invalid stats granularity: '{s}'. Valid options: file,
row_group"
+ )),
+ }
+ }
+}
+
+impl std::fmt::Display for StatsGranularity {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File => write!(f, "file"),
+ Self::RowGroup => write!(f, "row_group"),
+ }
+ }
+}
+
+/// Statistics for a single column at a given granularity.
+///
+/// Tracks min and max values from Parquet footer statistics for range-based
pruning.
+/// Values are stored as single-element Arrow arrays for direct comparison
using `arrow_ord::cmp`.
+#[derive(Clone, Debug)]
+pub struct ColumnStatistics {
+ /// Column name
+ pub column_name: String,
+ /// Arrow data type
+ pub data_type: DataType,
+ /// Minimum value (stored as a single-element Arrow array)
+ pub min_value: Option<ArrayRef>,
+ /// Maximum value (stored as a single-element Arrow array)
+ pub max_value: Option<ArrayRef>,
+}
+
+impl ColumnStatistics {
+ /// Create a new ColumnStatistics with the given column name and data type.
+ pub fn new(column_name: String, data_type: DataType) -> Self {
+ Self {
+ column_name,
+ data_type,
+ min_value: None,
+ max_value: None,
+ }
+ }
+
+ /// Create from Parquet row group statistics.
+ ///
+ /// Extracts min/max as single-element Arrow arrays using the Arrow
data_type
+ /// for correct logical type representation.
+ pub fn from_parquet_statistics(
+ column_name: &str,
+ data_type: &DataType,
+ stats: &ParquetStatistics,
+ ) -> Self {
+ let (min_value, max_value) = parquet_stats_to_min_max_arrays(stats,
data_type);
+
+ Self {
+ column_name: column_name.to_string(),
+ data_type: data_type.clone(),
+ min_value,
+ max_value,
+ }
+ }
+
+ /// Merge with another ColumnStatistics (for aggregation).
+ ///
+ /// Takes min of mins, max of maxs.
+ /// Used when aggregating row group stats to file-level stats.
+ pub fn merge(&mut self, other: &ColumnStatistics) {
+ // Merge min values (take the smaller one)
+ self.min_value = match (self.min_value.take(), &other.min_value) {
+ (Some(a), Some(b)) => {
+ if is_less_than(&a, b) {
+ Some(a)
+ } else {
+ Some(Arc::clone(b))
+ }
+ }
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(Arc::clone(b)),
+ (None, None) => None,
+ };
+
+ // Merge max values (take the larger one)
+ self.max_value = match (self.max_value.take(), &other.max_value) {
+ (Some(a), Some(b)) => {
+ if is_greater_than(&a, b) {
+ Some(a)
+ } else {
+ Some(Arc::clone(b))
+ }
+ }
+ (Some(a), None) => Some(a),
+ (None, Some(b)) => Some(Arc::clone(b)),
+ (None, None) => None,
+ };
+ }
+}
+
+/// Returns true if `a < b` using arrow-ord comparison.
+fn is_less_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+ cmp::lt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Returns true if `a > b` using arrow-ord comparison.
+fn is_greater_than(a: &ArrayRef, b: &ArrayRef) -> bool {
+ cmp::gt(a, b).map(|result| result.value(0)).unwrap_or(false)
+}
+
+/// Container for statistics at a specific granularity level.
+#[derive(Clone, Debug)]
+pub struct StatisticsContainer {
+ /// Granularity of these statistics
+ pub granularity: StatsGranularity,
+ /// Number of rows covered by these statistics
+ pub num_rows: Option<i64>,
+ /// Column statistics by column name
+ pub columns: HashMap<String, ColumnStatistics>,
+}
+
+impl StatisticsContainer {
+ /// Create an empty statistics container.
+ pub fn new(granularity: StatsGranularity) -> Self {
+ Self {
+ granularity,
+ num_rows: None,
+ columns: HashMap::new(),
+ }
+ }
+
+ /// Create file-level stats by aggregating row group stats from Parquet
metadata.
+ ///
+ /// This iterates through all row groups, extracts stats for each column,
+ /// and aggregates them to file-level statistics.
+ pub fn from_parquet_metadata(metadata: &ParquetMetaData, schema: &Schema)
-> Self {
+ let mut container = Self::new(StatsGranularity::File);
+
+ // Get total rows directly from file metadata
+ container.num_rows = Some(metadata.file_metadata().num_rows());
+
+ // Iterate through row groups and aggregate stats
+ for row_group in metadata.row_groups() {
+ let rg_stats = Self::from_row_group(row_group, schema);
+
+ // Merge row group stats into file-level stats
+ for (col_name, col_stats) in rg_stats.columns {
+ container
+ .columns
+ .entry(col_name)
+ .and_modify(|existing| existing.merge(&col_stats))
+ .or_insert(col_stats);
+ }
+ }
+
+ // Ensure all schema columns have an entry (even if no stats)
+ for field in schema.fields() {
+ let col_name = field.name();
+ if !container.columns.contains_key(col_name) {
+ container.columns.insert(
+ col_name.clone(),
+ ColumnStatistics::new(col_name.clone(),
field.data_type().clone()),
+ );
+ }
+ }
+
+ container
+ }
+
+ /// Create row-group-level stats from a single row group.
+ pub fn from_row_group(row_group: &RowGroupMetaData, schema: &Schema) ->
Self {
+ let mut container = Self::new(StatsGranularity::RowGroup);
+ container.num_rows = Some(row_group.num_rows());
+
+ // Build a map of column name to Arrow data type
+ let column_types: HashMap<&str, &DataType> = schema
+ .fields()
+ .iter()
+ .map(|f| (f.name().as_str(), f.data_type()))
+ .collect();
+
+ // Extract stats for each column in the row group
+ for col_chunk in row_group.columns() {
+ let col_path = col_chunk.column_descr().path();
+
+ // Skip nested columns (multi-part paths like "struct.field")
+ if col_path.parts().len() > 1 {
+ continue;
+ }
+
+ let Some(col_name) = col_path.parts().first().map(|s| s.as_str())
else {
+ continue;
+ };
+
+ // Skip if we don't have type info for this column
+ let Some(&data_type) = column_types.get(col_name) else {
+ continue;
+ };
+
+ // Extract statistics if available
+ if let Some(stats) = col_chunk.statistics() {
+ let col_stats =
+ ColumnStatistics::from_parquet_statistics(col_name,
data_type, stats);
+ container.columns.insert(col_name.to_string(), col_stats);
+ } else {
+ // No stats available, create empty entry
+ container.columns.insert(
+ col_name.to_string(),
+ ColumnStatistics::new(col_name.to_string(),
data_type.clone()),
+ );
+ }
+ }
+
+ container
+ }
+}
+
+/// Convert Parquet statistics to Arrow single-element arrays.
+///
+/// Uses the Arrow DataType to create appropriately typed arrays from Parquet
+/// physical type statistics.
+fn parquet_stats_to_min_max_arrays(
+ stats: &ParquetStatistics,
+ data_type: &DataType,
+) -> (Option<ArrayRef>, Option<ArrayRef>) {
+ match stats {
+ ParquetStatistics::Boolean(s) => {
+ let min = s
+ .min_opt()
+ .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+ let max = s
+ .max_opt()
+ .map(|v| Arc::new(BooleanArray::from(vec![*v])) as ArrayRef);
+ (min, max)
+ }
+ ParquetStatistics::Int32(s) => {
+ // Create arrays based on the Arrow logical type
+ let min = s.min_opt().map(|v| int32_to_array(*v, data_type));
+ let max = s.max_opt().map(|v| int32_to_array(*v, data_type));
+ (min, max)
+ }
+ ParquetStatistics::Int64(s) => {
+ let min = s.min_opt().map(|v| int64_to_array(*v, data_type));
+ let max = s.max_opt().map(|v| int64_to_array(*v, data_type));
+ (min, max)
+ }
+ ParquetStatistics::Int96(_) => {
+ // Int96 is deprecated, typically used for timestamps in legacy
Parquet
+ (None, None)
+ }
+ ParquetStatistics::Float(s) => {
+ let min = s
+ .min_opt()
+ .map(|v| Arc::new(Float32Array::from(vec![*v])) as ArrayRef);
+ let max = s
+ .max_opt()
+ .map(|v| Arc::new(Float32Array::from(vec![*v])) as ArrayRef);
+ (min, max)
+ }
+ ParquetStatistics::Double(s) => {
+ let min = s
+ .min_opt()
+ .map(|v| Arc::new(Float64Array::from(vec![*v])) as ArrayRef);
+ let max = s
+ .max_opt()
+ .map(|v| Arc::new(Float64Array::from(vec![*v])) as ArrayRef);
+ (min, max)
+ }
+ ParquetStatistics::ByteArray(s) => {
+ let min = s.min_opt().map(|b| bytes_to_array(b.data(), data_type));
+ let max = s.max_opt().map(|b| bytes_to_array(b.data(), data_type));
+ (min, max)
+ }
+ ParquetStatistics::FixedLenByteArray(s) => {
+ let min = s.min_opt().map(|b| bytes_to_array(b.data(), data_type));
+ let max = s.max_opt().map(|b| bytes_to_array(b.data(), data_type));
+ (min, max)
+ }
+ }
+}
+
+/// Convert Parquet Int32 physical value to Arrow array based on logical type.
+fn int32_to_array(value: i32, data_type: &DataType) -> ArrayRef {
+ match data_type {
+ DataType::Int8 => Arc::new(Int8Array::from(vec![value as i8])) as
ArrayRef,
+ DataType::Int16 => Arc::new(Int16Array::from(vec![value as i16])) as
ArrayRef,
+ DataType::Int32 => Arc::new(Int32Array::from(vec![value])) as ArrayRef,
+ DataType::UInt8 => Arc::new(UInt8Array::from(vec![value as u8])) as
ArrayRef,
+ DataType::UInt16 => Arc::new(UInt16Array::from(vec![value as u16])) as
ArrayRef,
+ DataType::UInt32 => Arc::new(UInt32Array::from(vec![value as u32])) as
ArrayRef,
+ DataType::Date32 => Arc::new(Date32Array::from(vec![value])) as
ArrayRef,
+ _ => Arc::new(Int32Array::from(vec![value])) as ArrayRef, // fallback
+ }
+}
+
+/// Convert Parquet Int64 physical value to Arrow array based on logical type.
+fn int64_to_array(value: i64, data_type: &DataType) -> ArrayRef {
+ match data_type {
+ DataType::Int64 => Arc::new(Int64Array::from(vec![value])) as ArrayRef,
+ DataType::UInt64 => Arc::new(UInt64Array::from(vec![value as u64])) as
ArrayRef,
+ DataType::Timestamp(_, _) => {
+ Arc::new(TimestampMicrosecondArray::from(vec![value])) as ArrayRef
+ }
Review Comment:
The timestamp conversion assumes all timestamps use microseconds
(TimestampMicrosecondArray), but the Arrow DataType::Timestamp can have
different TimeUnits (Second, Millisecond, Microsecond, Nanosecond). This could
lead to incorrect statistics extraction for timestamps with different time
units.
Consider matching on the time unit and creating the appropriate timestamp
array type, or converting the value to match the expected time unit. For
example:
```rust
DataType::Timestamp(time_unit, _) => {
match time_unit {
TimeUnit::Second =>
Arc::new(TimestampSecondArray::from(vec![value])),
TimeUnit::Millisecond =>
Arc::new(TimestampMillisecondArray::from(vec![value])),
TimeUnit::Microsecond =>
Arc::new(TimestampMicrosecondArray::from(vec![value])),
TimeUnit::Nanosecond =>
Arc::new(TimestampNanosecondArray::from(vec![value])),
}
}
```
##########
crates/core/tests/statistics_tests.rs:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Integration tests for the statistics module.
+//!
+//! These tests generate Parquet files with known data, then verify that
+//! the statistics extraction correctly reads min/max values.
+
+use std::fs::File;
+use std::sync::Arc;
+
+use arrow_array::{
+ ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array,
Int8Array, Int16Array,
+ Int32Array, Int64Array, RecordBatch, StringArray,
TimestampMicrosecondArray, UInt32Array,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::FileReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use tempfile::tempdir;
+
+use hudi_core::statistics::{StatisticsContainer, StatsGranularity};
+
+/// Helper to write a RecordBatch to a Parquet file and return the path.
+fn write_parquet_file(batch: &RecordBatch, path: &std::path::Path) {
+ let file = File::create(path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+ .build();
+ let mut writer = ArrowWriter::try_new(file, batch.schema(),
Some(props)).unwrap();
+ writer.write(batch).unwrap();
+ writer.close().unwrap();
+}
+
+/// Helper to write a RecordBatch to a Parquet file without statistics.
+fn write_parquet_file_no_stats(batch: &RecordBatch, path: &std::path::Path) {
+ let file = File::create(path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
+ .build();
+ let mut writer = ArrowWriter::try_new(file, batch.schema(),
Some(props)).unwrap();
+ writer.write(batch).unwrap();
+ writer.close().unwrap();
+}
+
+/// Helper to write multiple RecordBatches to a single Parquet file (multiple
row groups).
+fn write_parquet_file_multiple_row_groups(batches: &[RecordBatch], path:
&std::path::Path) {
+ let file = File::create(path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
+ .set_max_row_group_size(3) // Force smaller row groups
+ .build();
+ let mut writer = ArrowWriter::try_new(file, batches[0].schema(),
Some(props)).unwrap();
+ for batch in batches {
+ writer.write(batch).unwrap();
+ }
+ writer.close().unwrap();
+}
+
+/// Helper to read Parquet metadata from a file.
+fn read_parquet_metadata(path: &std::path::Path) ->
parquet::file::metadata::ParquetMetaData {
+ let file = File::open(path).unwrap();
+ let reader = SerializedFileReader::new(file).unwrap();
+ reader.metadata().clone()
+}
+
+/// Helper to extract Int32 value from ArrayRef
+fn get_int32(arr: &ArrayRef) -> i32 {
+ arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int64 value from ArrayRef
+fn get_int64(arr: &ArrayRef) -> i64 {
+ arr.as_any().downcast_ref::<Int64Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int8 value from ArrayRef
+fn get_int8(arr: &ArrayRef) -> i8 {
+ arr.as_any().downcast_ref::<Int8Array>().unwrap().value(0)
+}
+
+/// Helper to extract Int16 value from ArrayRef
+fn get_int16(arr: &ArrayRef) -> i16 {
+ arr.as_any().downcast_ref::<Int16Array>().unwrap().value(0)
+}
+
+/// Helper to extract UInt32 value from ArrayRef
+fn get_uint32(arr: &ArrayRef) -> u32 {
+ arr.as_any().downcast_ref::<UInt32Array>().unwrap().value(0)
+}
+
+/// Helper to extract Float32 value from ArrayRef
+fn get_float32(arr: &ArrayRef) -> f32 {
+ arr.as_any()
+ .downcast_ref::<Float32Array>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract Float64 value from ArrayRef
+fn get_float64(arr: &ArrayRef) -> f64 {
+ arr.as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract Boolean value from ArrayRef
+fn get_bool(arr: &ArrayRef) -> bool {
+ arr.as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap()
+ .value(0)
+}
+
+/// Helper to extract String value from ArrayRef
+fn get_string(arr: &ArrayRef) -> String {
+ arr.as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .value(0)
+ .to_string()
+}
+
+/// Helper to extract Date32 value from ArrayRef
+fn get_date32(arr: &ArrayRef) -> i32 {
+ arr.as_any().downcast_ref::<Date32Array>().unwrap().value(0)
+}
+
+/// Helper to extract TimestampMicrosecond value from ArrayRef
+fn get_timestamp_micros(arr: &ArrayRef) -> i64 {
+ arr.as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap()
+ .value(0)
+}
+
+mod file_level_statistics {
+ use super::*;
+
+ #[test]
+ fn test_integer_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("integer_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("int32_col", DataType::Int32, false),
+ Field::new("int64_col", DataType::Int64, false),
+ Field::new("uint32_col", DataType::UInt32, false),
+ ]));
+ let int32_values = Int32Array::from(vec![10, 50, 30, 20, 40]);
+ let int64_values = Int64Array::from(vec![100_i64, 500, 300, 200, 400]);
+ let uint32_values = UInt32Array::from(vec![100_u32, 500, 300, 200,
400]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(int32_values) as ArrayRef,
+ Arc::new(int64_values) as ArrayRef,
+ Arc::new(uint32_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ assert_eq!(stats.granularity, StatsGranularity::File);
+ assert_eq!(stats.num_rows, Some(5));
+
+ let int32_stats = stats.columns.get("int32_col").unwrap();
+ assert_eq!(get_int32(int32_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(int32_stats.max_value.as_ref().unwrap()), 50);
+
+ let int64_stats = stats.columns.get("int64_col").unwrap();
+ assert_eq!(get_int64(int64_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_int64(int64_stats.max_value.as_ref().unwrap()), 500);
+
+ let uint32_stats = stats.columns.get("uint32_col").unwrap();
+ assert_eq!(get_uint32(uint32_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_uint32(uint32_stats.max_value.as_ref().unwrap()), 500);
+ }
+
+ #[test]
+ fn test_int8_and_int16_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("small_int_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("int8_col", DataType::Int8, false),
+ Field::new("int16_col", DataType::Int16, false),
+ ]));
+ let int8_values = Int8Array::from(vec![-10_i8, 50, 30, -20, 40]);
+ let int16_values = Int16Array::from(vec![100_i16, 500, -300, 200,
400]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(int8_values) as ArrayRef,
+ Arc::new(int16_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let int8_stats = stats.columns.get("int8_col").unwrap();
+ assert_eq!(get_int8(int8_stats.min_value.as_ref().unwrap()), -20);
+ assert_eq!(get_int8(int8_stats.max_value.as_ref().unwrap()), 50);
+
+ let int16_stats = stats.columns.get("int16_col").unwrap();
+ assert_eq!(get_int16(int16_stats.min_value.as_ref().unwrap()), -300);
+ assert_eq!(get_int16(int16_stats.max_value.as_ref().unwrap()), 500);
+ }
+
+ #[test]
+ fn test_float_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("float_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("float32_col", DataType::Float32, false),
+ Field::new("float64_col", DataType::Float64, false),
+ ]));
+ let float32_values = Float32Array::from(vec![1.5_f32, 3.5, 2.5, 0.5,
4.5]);
+ let float64_values = Float64Array::from(vec![10.5_f64, 30.5, 20.5,
5.5, 40.5]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(float32_values) as ArrayRef,
+ Arc::new(float64_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let float32_stats = stats.columns.get("float32_col").unwrap();
+ assert_eq!(get_float32(float32_stats.min_value.as_ref().unwrap()),
0.5);
+ assert_eq!(get_float32(float32_stats.max_value.as_ref().unwrap()),
4.5);
+
+ let float64_stats = stats.columns.get("float64_col").unwrap();
+ assert_eq!(get_float64(float64_stats.min_value.as_ref().unwrap()),
5.5);
+ assert_eq!(get_float64(float64_stats.max_value.as_ref().unwrap()),
40.5);
+ }
+
+ #[test]
+ fn test_string_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("string_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new("name",
DataType::Utf8, false)]));
+ let names = StringArray::from(vec!["charlie", "alice", "bob", "diana",
"eve"]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(names) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("name").unwrap();
+ assert_eq!(get_string(col_stats.min_value.as_ref().unwrap()), "alice");
+ assert_eq!(get_string(col_stats.max_value.as_ref().unwrap()), "eve");
+ }
+
+ #[test]
+ fn test_boolean_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("bool_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "is_active",
+ DataType::Boolean,
+ false,
+ )]));
+ let values = BooleanArray::from(vec![true, false, true, false, true]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("is_active").unwrap();
+ assert!(!get_bool(col_stats.min_value.as_ref().unwrap()));
+ assert!(get_bool(col_stats.max_value.as_ref().unwrap()));
+ }
+
+ #[test]
+ fn test_date32_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("date32_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "event_date",
+ DataType::Date32,
+ false,
+ )]));
+ // Days since epoch: 19000 = ~2022-01-01, 19100 = ~2022-04-11, etc.
+ let values = Date32Array::from(vec![19000, 19100, 19050, 18900,
19200]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("event_date").unwrap();
+ assert_eq!(get_date32(col_stats.min_value.as_ref().unwrap()), 18900);
+ assert_eq!(get_date32(col_stats.max_value.as_ref().unwrap()), 19200);
+ }
+
+ #[test]
+ fn test_timestamp_column_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("timestamp_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "event_time",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ )]));
+ // Microseconds since epoch
+ let values = TimestampMicrosecondArray::from(vec![
+ 1_000_000_i64,
+ 5_000_000,
+ 3_000_000,
+ 2_000_000,
+ 4_000_000,
+ ]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("event_time").unwrap();
+ assert_eq!(
+ get_timestamp_micros(col_stats.min_value.as_ref().unwrap()),
+ 1_000_000
+ );
+ assert_eq!(
+ get_timestamp_micros(col_stats.max_value.as_ref().unwrap()),
+ 5_000_000
+ );
+ }
+
+}
+
+mod null_handling {
+ use super::*;
+
+ #[test]
+ fn test_nullable_columns_with_nulls() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("nullable_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("int_value", DataType::Int32, true),
+ Field::new("name", DataType::Utf8, true),
+ ]));
+ let int_values = Int32Array::from(vec![Some(10), None, Some(30), None,
Some(20)]);
+ let names = StringArray::from(vec![
+ Some("alice"),
+ None,
+ Some("charlie"),
+ None,
+ Some("bob"),
+ ]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(int_values) as ArrayRef,
+ Arc::new(names) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // Verify Int32 column stats ignore nulls
+ let int_stats = stats.columns.get("int_value").unwrap();
+ assert_eq!(get_int32(int_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(int_stats.max_value.as_ref().unwrap()), 30);
+
+ // Verify String column stats ignore nulls
+ let name_stats = stats.columns.get("name").unwrap();
+ assert_eq!(get_string(name_stats.min_value.as_ref().unwrap()),
"alice");
+ assert_eq!(get_string(name_stats.max_value.as_ref().unwrap()),
"charlie");
+ }
+
+ #[test]
+ fn test_all_nulls_column() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("all_nulls_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ true,
+ )]));
+ let values: Int32Array = vec![None, None, None].into_iter().collect();
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("value").unwrap();
+ // For all nulls, min/max should be None
+ assert!(col_stats.min_value.is_none());
+ assert!(col_stats.max_value.is_none());
+ }
+}
+
+mod row_group_statistics {
+ use super::*;
+
+ #[test]
+ fn test_single_row_group_stats() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("single_rg_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("value", DataType::Float64, false),
+ ]));
+ let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
+ let values = Float64Array::from(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(ids) as ArrayRef, Arc::new(values) as ArrayRef],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let row_group = &metadata.row_groups()[0];
+ let rg_stats = StatisticsContainer::from_row_group(row_group, &schema);
+
+ assert_eq!(rg_stats.granularity, StatsGranularity::RowGroup);
+ assert_eq!(rg_stats.num_rows, Some(5));
+
+ let id_stats = rg_stats.columns.get("id").unwrap();
+ assert_eq!(get_int32(id_stats.min_value.as_ref().unwrap()), 1);
+ assert_eq!(get_int32(id_stats.max_value.as_ref().unwrap()), 5);
+
+ let value_stats = rg_stats.columns.get("value").unwrap();
+ assert_eq!(get_float64(value_stats.min_value.as_ref().unwrap()), 10.0);
+ assert_eq!(get_float64(value_stats.max_value.as_ref().unwrap()), 50.0);
+ }
+
+ #[test]
+ fn test_multiple_row_groups_aggregation() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("multi_rg_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ true,
+ )]));
+
+ // Create batches that will span multiple row groups
+ // Batch 1: values 100-110
+ let batch1 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![100, 105, 110])) as ArrayRef],
+ )
+ .unwrap();
+
+ // Batch 2: values 1-10 (includes global min)
+ let batch2 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![1, 5, 10])) as ArrayRef],
+ )
+ .unwrap();
+
+ // Batch 3: values 200-220 (includes global max)
+ let batch3 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![200, 210, 220])) as ArrayRef],
+ )
+ .unwrap();
+
+ write_parquet_file_multiple_row_groups(&[batch1, batch2, batch3],
&parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+
+ // Verify we have multiple row groups
+ assert!(
+ metadata.num_row_groups() >= 2,
+ "Expected multiple row groups"
+ );
+
+ // Get file-level stats (aggregated from row groups)
+ let file_stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ assert_eq!(file_stats.granularity, StatsGranularity::File);
+ assert_eq!(file_stats.num_rows, Some(9)); // 3 + 3 + 3
+
+ let col_stats = file_stats.columns.get("value").unwrap();
+ // File-level min should be global min across all row groups
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 1);
+ // File-level max should be global max across all row groups
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 220);
+ }
+
+ #[test]
+ fn test_row_group_with_nulls_aggregation() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("rg_nulls_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ true,
+ )]));
+
+ // Batch with some nulls
+ let batch1 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![Some(10), None, Some(20)])) as
ArrayRef],
+ )
+ .unwrap();
+
+ // Batch with more nulls
+ let batch2 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![None, Some(30), None])) as
ArrayRef],
+ )
+ .unwrap();
+
+ write_parquet_file_multiple_row_groups(&[batch1, batch2],
&parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let file_stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = file_stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 30);
+ }
+}
+
+mod multiple_columns {
+ use super::*;
+
+ #[test]
+ fn test_mixed_type_columns() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("mixed_types_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("score", DataType::Float64, true),
+ Field::new("is_active", DataType::Boolean, false),
+ ]));
+
+ let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
+ let names = StringArray::from(vec!["alice", "bob", "charlie", "diana",
"eve"]);
+ let scores = Float64Array::from(vec![Some(85.5), Some(92.0), None,
Some(78.5), Some(95.0)]);
+ let is_active = BooleanArray::from(vec![true, false, true, true,
false]);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(ids) as ArrayRef,
+ Arc::new(names) as ArrayRef,
+ Arc::new(scores) as ArrayRef,
+ Arc::new(is_active) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // Verify all columns have statistics
+ assert_eq!(stats.columns.len(), 4);
+
+ // Check int32 column
+ let id_stats = stats.columns.get("id").unwrap();
+ assert_eq!(get_int32(id_stats.min_value.as_ref().unwrap()), 1);
+ assert_eq!(get_int32(id_stats.max_value.as_ref().unwrap()), 5);
+
+ // Check string column
+ let name_stats = stats.columns.get("name").unwrap();
+ assert_eq!(get_string(name_stats.min_value.as_ref().unwrap()),
"alice");
+ assert_eq!(get_string(name_stats.max_value.as_ref().unwrap()), "eve");
+
+ // Check float64 column with null
+ let score_stats = stats.columns.get("score").unwrap();
+ assert_eq!(get_float64(score_stats.min_value.as_ref().unwrap()), 78.5);
+ assert_eq!(get_float64(score_stats.max_value.as_ref().unwrap()), 95.0);
+
+ // Check boolean column
+ let active_stats = stats.columns.get("is_active").unwrap();
+ assert!(!get_bool(active_stats.min_value.as_ref().unwrap()));
+ assert!(get_bool(active_stats.max_value.as_ref().unwrap()));
+ }
+
+ #[test]
+ fn test_schema_column_ordering() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("ordering_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("z_col", DataType::Int32, false),
+ Field::new("a_col", DataType::Int32, false),
+ Field::new("m_col", DataType::Int32, false),
+ ]));
+
+ let z_values = Int32Array::from(vec![1, 2, 3]);
+ let a_values = Int32Array::from(vec![10, 20, 30]);
+ let m_values = Int32Array::from(vec![100, 200, 300]);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(z_values) as ArrayRef,
+ Arc::new(a_values) as ArrayRef,
+ Arc::new(m_values) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // All columns should have stats regardless of their name ordering
+ assert!(stats.columns.contains_key("z_col"));
+ assert!(stats.columns.contains_key("a_col"));
+ assert!(stats.columns.contains_key("m_col"));
+
+ // Verify values
+ let z_stats = stats.columns.get("z_col").unwrap();
+ assert_eq!(get_int32(z_stats.min_value.as_ref().unwrap()), 1);
+ assert_eq!(get_int32(z_stats.max_value.as_ref().unwrap()), 3);
+
+ let a_stats = stats.columns.get("a_col").unwrap();
+ assert_eq!(get_int32(a_stats.min_value.as_ref().unwrap()), 10);
+ assert_eq!(get_int32(a_stats.max_value.as_ref().unwrap()), 30);
+
+ let m_stats = stats.columns.get("m_col").unwrap();
+ assert_eq!(get_int32(m_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_int32(m_stats.max_value.as_ref().unwrap()), 300);
+ }
+}
+
+mod edge_cases {
+ use super::*;
+
+ #[test]
+ fn test_min_equals_max() {
+ let temp_dir = tempdir().unwrap();
+
+ // Test single row case
+ let single_row_path = temp_dir.path().join("single_row_test.parquet");
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ false,
+ )]));
+ let values = Int32Array::from(vec![42]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+ write_parquet_file(&batch, &single_row_path);
+
+ let metadata = read_parquet_metadata(&single_row_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+ assert_eq!(stats.num_rows, Some(1));
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 42);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 42);
+
+ // Test identical values case
+ let identical_path = temp_dir.path().join("identical_test.parquet");
+ let values = Int32Array::from(vec![100, 100, 100, 100, 100]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+ write_parquet_file(&batch, &identical_path);
+
+ let metadata = read_parquet_metadata(&identical_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), 100);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 100);
+ }
+
+ #[test]
+ fn test_negative_numbers() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("negative_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ false,
+ )]));
+ let values = Int32Array::from(vec![-100, -50, 0, 50, 100]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(get_int32(col_stats.min_value.as_ref().unwrap()), -100);
+ assert_eq!(get_int32(col_stats.max_value.as_ref().unwrap()), 100);
+ }
+
+ #[test]
+ fn test_empty_string() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("empty_string_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new("name",
DataType::Utf8, false)]));
+ let names = StringArray::from(vec!["", "apple", "banana"]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(names) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("name").unwrap();
+ // Empty string should be the minimum (lexicographically smallest)
+ assert_eq!(get_string(col_stats.min_value.as_ref().unwrap()), "");
+ assert_eq!(get_string(col_stats.max_value.as_ref().unwrap()),
"banana");
+ }
+
+ #[test]
+ fn test_special_float_values() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("special_float_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Float64,
+ false,
+ )]));
+ // Test with various float values including very small and very large
+ let values = Float64Array::from(vec![
+ f64::MIN_POSITIVE,
+ 1.0,
+ f64::MAX / 2.0,
+ -f64::MAX / 2.0,
+ 0.0,
+ ]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ let col_stats = stats.columns.get("value").unwrap();
+ assert_eq!(
+ get_float64(col_stats.min_value.as_ref().unwrap()),
+ -f64::MAX / 2.0
+ );
+ assert_eq!(
+ get_float64(col_stats.max_value.as_ref().unwrap()),
+ f64::MAX / 2.0
+ );
+ }
+
+ #[test]
+ fn test_parquet_without_statistics() {
+ let temp_dir = tempdir().unwrap();
+ let parquet_path = temp_dir.path().join("no_stats_test.parquet");
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int32,
+ false,
+ )]));
+ let values = Int32Array::from(vec![10, 50, 30, 20, 40]);
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(values) as
ArrayRef]).unwrap();
+
+ write_parquet_file_no_stats(&batch, &parquet_path);
+
+ let metadata = read_parquet_metadata(&parquet_path);
+ let stats = StatisticsContainer::from_parquet_metadata(&metadata,
&schema);
+
+ // When statistics are disabled, min/max should be None
+ let col_stats = stats.columns.get("value").unwrap();
+ assert!(col_stats.min_value.is_none());
+ assert!(col_stats.max_value.is_none());
+ }
+}
Review Comment:
Consider adding an integration test that verifies file-level pruning
actually skips files during query execution. While there are comprehensive unit
tests for statistics extraction and FilePruner logic, an end-to-end test would
validate that:
1. Files are correctly pruned based on column stats during `read_snapshot`
2. The pruning decision is logged (to verify pruning occurred)
3. Query results remain correct despite pruning
Example test scenario:
- Create/use a table with multiple files containing non-overlapping value
ranges
- Apply a filter that should prune some files
- Verify pruning occurred by checking logs or file access metrics
- Verify correct results are returned
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]