efredine commented on code in PR #6046: URL: https://github.com/apache/arrow-rs/pull/6046#discussion_r1676878384
########## parquet/src/arrow/arrow_reader/statistics.rs: ########## @@ -0,0 +1,2536 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`]. + +use crate::arrow::buffer::bit_util::sign_extend_be; +use crate::data_type::{ByteArray, FixedLenByteArray}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; +use crate::file::page_index::index::{Index, PageIndex}; +use crate::file::statistics::Statistics as ParquetStatistics; +use crate::schema::types::SchemaDescriptor; +use arrow_array::builder::{ + BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder, +}; +use arrow_array::{ + new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, + Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int16Array, + Int32Array, Int64Array, Int8Array, LargeBinaryArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, +}; +use arrow_buffer::i256; +use arrow_schema::{DataType, Field, FieldRef, Schema, TimeUnit}; +use half::f16; +use paste::paste; +use std::sync::Arc; + +// Convert the bytes array to i128. +// The endian of the input bytes array must be big-endian. +pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { + // The bytes array are from parquet file and must be the big-endian. + // The endian is defined by parquet format, and the reference document + // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 + i128::from_be_bytes(sign_extend_be::<16>(b)) +} + +// Convert the bytes array to i256. +// The endian of the input bytes array must be big-endian. +pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 { + i256::from_be_bytes(sign_extend_be::<32>(b)) +} + +// Convert the bytes array to f16 +pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> { + match b { + [low, high] => Some(f16::from_be_bytes([*high, *low])), + _ => None, + } +} + +/// Define an adapter iterator for extracting statistics from an iterator of +/// `ParquetStatistics` +/// +/// +/// Handles checking if the statistics are present and valid with the correct type. +/// +/// Parameters: +/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`) +/// * `$func` is the function to call to get the value (e.g. `min` or `max`) +/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`) +/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`) +macro_rules! make_stats_iterator { + ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => { + /// Maps an iterator of `ParquetStatistics` into an iterator of + /// `&$stat_value_type`` + /// + /// Yielded elements: + /// * Some(stats) if valid + /// * None if the statistics are not present, not valid, or not $stat_value_type + struct $iterator_type<'a, I> + where + I: Iterator<Item = Option<&'a ParquetStatistics>>, + { + iter: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator<Item = Option<&'a ParquetStatistics>>, + { + /// Create a new iterator to extract the statistics + fn new(iter: I) -> Self { + Self { iter } + } + } + + /// Implement the Iterator trait for the iterator + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator<Item = Option<&'a ParquetStatistics>>, + { + type Item = Option<&'a $stat_value_type>; + + /// return the next statistics value + fn next(&mut self) -> Option<Self::Item> { + let next = self.iter.next(); + next.map(|x| { + x.and_then(|stats| match stats { + $parquet_statistics_type(s) if stats.has_min_max_set() => Some(s.$func()), + _ => None, + }) + }) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } + } + }; +} + +make_stats_iterator!( + MinBooleanStatsIterator, + min, + ParquetStatistics::Boolean, + bool +); +make_stats_iterator!( + MaxBooleanStatsIterator, + max, + ParquetStatistics::Boolean, + bool +); +make_stats_iterator!(MinInt32StatsIterator, min, ParquetStatistics::Int32, i32); +make_stats_iterator!(MaxInt32StatsIterator, max, ParquetStatistics::Int32, i32); +make_stats_iterator!(MinInt64StatsIterator, min, ParquetStatistics::Int64, i64); +make_stats_iterator!(MaxInt64StatsIterator, max, ParquetStatistics::Int64, i64); +make_stats_iterator!(MinFloatStatsIterator, min, ParquetStatistics::Float, f32); +make_stats_iterator!(MaxFloatStatsIterator, max, ParquetStatistics::Float, f32); +make_stats_iterator!(MinDoubleStatsIterator, min, ParquetStatistics::Double, f64); +make_stats_iterator!(MaxDoubleStatsIterator, max, ParquetStatistics::Double, f64); +make_stats_iterator!( + MinByteArrayStatsIterator, + min_bytes, + ParquetStatistics::ByteArray, + [u8] +); +make_stats_iterator!( + MaxByteArrayStatsIterator, + max_bytes, + ParquetStatistics::ByteArray, + [u8] +); +make_stats_iterator!( + MinFixedLenByteArrayStatsIterator, + min_bytes, + ParquetStatistics::FixedLenByteArray, + [u8] +); +make_stats_iterator!( + MaxFixedLenByteArrayStatsIterator, + max_bytes, + ParquetStatistics::FixedLenByteArray, + [u8] +); + +/// Special iterator adapter for extracting i128 values from from an iterator of +/// `ParquetStatistics` +/// +/// Handles checking if the statistics are present and valid with the correct type. +/// +/// Depending on the parquet file, the statistics for `Decimal128` can be stored as +/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown: +/// +/// This iterator handles all cases, extracting the values +/// and converting it to `stat_value_type`. +/// +/// Parameters: +/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`) +/// * `$func` is the function to call to get the value (e.g. `min` or `max`) +/// * `$bytes_func` is the function to call to get the value as bytes (e.g. `min_bytes` or `max_bytes`) +/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`) +/// * `convert_func` is the function to convert the bytes to stats value (e.g. `from_bytes_to_i128`) +macro_rules! make_decimal_stats_iterator { + ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => { + struct $iterator_type<'a, I> + where + I: Iterator<Item = Option<&'a ParquetStatistics>>, + { + iter: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator<Item = Option<&'a ParquetStatistics>>, + { + fn new(iter: I) -> Self { + Self { iter } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator<Item = Option<&'a ParquetStatistics>>, + { + type Item = Option<$stat_value_type>; + + fn next(&mut self) -> Option<Self::Item> { + let next = self.iter.next(); + next.map(|x| { + x.and_then(|stats| { + if !stats.has_min_max_set() { + return None; + } + match stats { + ParquetStatistics::Int32(s) => Some($stat_value_type::from(*s.$func())), + ParquetStatistics::Int64(s) => Some($stat_value_type::from(*s.$func())), + ParquetStatistics::ByteArray(s) => Some($convert_func(s.$bytes_func())), + ParquetStatistics::FixedLenByteArray(s) => { + Some($convert_func(s.$bytes_func())) + } + _ => None, + } + }) + }) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } + } + }; +} + +make_decimal_stats_iterator!( + MinDecimal128StatsIterator, + min, + min_bytes, + i128, + from_bytes_to_i128 +); +make_decimal_stats_iterator!( + MaxDecimal128StatsIterator, + max, + max_bytes, + i128, + from_bytes_to_i128 +); +make_decimal_stats_iterator!( + MinDecimal256StatsIterator, + min, + min_bytes, + i256, + from_bytes_to_i256 +); +make_decimal_stats_iterator!( + MaxDecimal256StatsIterator, + max, + max_bytes, + i256, + from_bytes_to_i256 +); + +/// Special macro to combine the statistics iterators for min and max using the [`mod@paste`] macro. +/// This is used to avoid repeating the same code for min and max statistics extractions +/// +/// Parameters: +/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min` or `Max`) +/// data_type: The data type of the statistics (e.g. `DataType::Int32`) +/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from. +macro_rules! get_statistics { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + paste! { + match $data_type { + DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter( + [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Int8 => Ok(Arc::new(Int8Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| i8::try_from(*x).ok()) + }), + ))), + DataType::Int16 => Ok(Arc::new(Int16Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| i16::try_from(*x).ok()) + }), + ))), + DataType::Int32 => Ok(Arc::new(Int32Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Int64 => Ok(Arc::new(Int64Array::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| u8::try_from(*x).ok()) + }), + ))), + DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| u16::try_from(*x).ok()) + }), + ))), + DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)), + ))), + DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)), + ))), + DataType::Float16 => Ok(Arc::new(Float16Array::from_iter( + [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| { + from_bytes_to_f16(x) + })), + ))), + DataType::Float32 => Ok(Arc::new(Float32Array::from_iter( + [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Float64 => Ok(Arc::new(Float64Array::from_iter( + [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Date32 => Ok(Arc::new(Date32Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Date64 => Ok(Arc::new(Date64Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator) + .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)), + ))), + DataType::Timestamp(unit, timezone) =>{ + let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()); + Ok(match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + }) + }, + DataType::Time32(unit) => { + Ok(match unit { + TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + _ => { + let len = $iterator.count(); + // don't know how to extract statistics, so return a null array + new_null_array($data_type, len) + } + }) + }, + DataType::Time64(unit) => { + Ok(match unit { + TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + _ => { + let len = $iterator.count(); + // don't know how to extract statistics, so return a null array + new_null_array($data_type, len) + } + }) + }, + DataType::Binary => Ok(Arc::new(BinaryArray::from_iter( + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator) + ))), + DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter( + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator) + ))), + DataType::Utf8 => { + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = StringBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) + }, + DataType::LargeUtf8 => { + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = LargeStringBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) + }, + DataType::FixedSizeBinary(size) => { + let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator); + let mut builder = FixedSizeBinaryBuilder::new(*size); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + // ignore invalid values + if x.len().try_into() != Ok(*size){ + builder.append_null(); + continue; + } + + builder.append_value(x).expect("ensure to append successfully here, because size have been checked before"); + } + Ok(Arc::new(builder.finish())) + }, + DataType::Decimal128(precision, scale) => { + let arr = Decimal128Array::from_iter( + [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator) + ).with_precision_and_scale(*precision, *scale)?; + Ok(Arc::new(arr)) + }, + DataType::Decimal256(precision, scale) => { + let arr = Decimal256Array::from_iter( + [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator) + ).with_precision_and_scale(*precision, *scale)?; + Ok(Arc::new(arr)) + }, + DataType::Dictionary(_, value_type) => { + [<$stat_type_prefix:lower _ statistics>](value_type, $iterator) + } + + DataType::Map(_,_) | + DataType::Duration(_) | + DataType::Interval(_) | + DataType::Null | + DataType::BinaryView | + DataType::Utf8View | + DataType::List(_) | + DataType::ListView(_) | + DataType::FixedSizeList(_, _) | + DataType::LargeList(_) | + DataType::LargeListView(_) | + DataType::Struct(_) | + DataType::Union(_, _) | + DataType::RunEndEncoded(_, _) => { + let len = $iterator.count(); + // don't know how to extract statistics, so return a null array + Ok(new_null_array($data_type, len)) + } + }}} +} + +macro_rules! make_data_page_stats_iterator { + ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => { + struct $iterator_type<'a, I> + where + I: Iterator<Item = (usize, &'a Index)>, + { + iter: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator<Item = (usize, &'a Index)>, + { + fn new(iter: I) -> Self { + Self { iter } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator<Item = (usize, &'a Index)>, + { + type Item = Vec<Option<$stat_value_type>>; + + fn next(&mut self) -> Option<Self::Item> { + let next = self.iter.next(); + match next { + Some((len, index)) => match index { + $index_type(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| $func(x)) + .collect::<Vec<_>>(), + ), + // No matching `Index` found; + // thus no statistics that can be extracted. + // We return vec![None; len] to effectively + // create an arrow null-array with the length + // corresponding to the number of entries in + // `ParquetOffsetIndex` per row group per column. + _ => Some(vec![None; len]), + }, + _ => None, + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } + } + }; +} + +make_data_page_stats_iterator!( + MinBooleanDataPageStatsIterator, + |x: &PageIndex<bool>| { x.min }, + Index::BOOLEAN, + bool +); +make_data_page_stats_iterator!( + MaxBooleanDataPageStatsIterator, + |x: &PageIndex<bool>| { x.max }, + Index::BOOLEAN, + bool +); +make_data_page_stats_iterator!( + MinInt32DataPageStatsIterator, + |x: &PageIndex<i32>| { x.min }, + Index::INT32, + i32 +); +make_data_page_stats_iterator!( + MaxInt32DataPageStatsIterator, + |x: &PageIndex<i32>| { x.max }, + Index::INT32, + i32 +); +make_data_page_stats_iterator!( + MinInt64DataPageStatsIterator, + |x: &PageIndex<i64>| { x.min }, + Index::INT64, + i64 +); +make_data_page_stats_iterator!( + MaxInt64DataPageStatsIterator, + |x: &PageIndex<i64>| { x.max }, + Index::INT64, + i64 +); +make_data_page_stats_iterator!( + MinFloat16DataPageStatsIterator, + |x: &PageIndex<FixedLenByteArray>| { x.min.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); +make_data_page_stats_iterator!( + MaxFloat16DataPageStatsIterator, + |x: &PageIndex<FixedLenByteArray>| { x.max.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); +make_data_page_stats_iterator!( + MinFloat32DataPageStatsIterator, + |x: &PageIndex<f32>| { x.min }, + Index::FLOAT, + f32 +); +make_data_page_stats_iterator!( + MaxFloat32DataPageStatsIterator, + |x: &PageIndex<f32>| { x.max }, + Index::FLOAT, + f32 +); +make_data_page_stats_iterator!( + MinFloat64DataPageStatsIterator, + |x: &PageIndex<f64>| { x.min }, + Index::DOUBLE, + f64 +); +make_data_page_stats_iterator!( + MaxFloat64DataPageStatsIterator, + |x: &PageIndex<f64>| { x.max }, + Index::DOUBLE, + f64 +); +make_data_page_stats_iterator!( + MinByteArrayDataPageStatsIterator, + |x: &PageIndex<ByteArray>| { x.min.clone() }, + Index::BYTE_ARRAY, + ByteArray +); +make_data_page_stats_iterator!( + MaxByteArrayDataPageStatsIterator, + |x: &PageIndex<ByteArray>| { x.max.clone() }, + Index::BYTE_ARRAY, + ByteArray +); +make_data_page_stats_iterator!( + MaxFixedLenByteArrayDataPageStatsIterator, + |x: &PageIndex<FixedLenByteArray>| { x.max.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); + +make_data_page_stats_iterator!( + MinFixedLenByteArrayDataPageStatsIterator, + |x: &PageIndex<FixedLenByteArray>| { x.min.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); + +macro_rules! get_decimal_page_stats_iterator { + ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => { + struct $iterator_type<'a, I> + where + I: Iterator<Item = (usize, &'a Index)>, + { + iter: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator<Item = (usize, &'a Index)>, + { + fn new(iter: I) -> Self { + Self { iter } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator<Item = (usize, &'a Index)>, + { + type Item = Vec<Option<$stat_value_type>>; + + fn next(&mut self) -> Option<Self::Item> { + let next = self.iter.next(); + match next { + Some((len, index)) => match index { + Index::INT32(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x)))) + .collect::<Vec<_>>(), + ), + Index::INT64(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x)))) + .collect::<Vec<_>>(), + ), + Index::BYTE_ARRAY(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| { + x.clone().$func.and_then(|x| Some($convert_func(x.data()))) + }) + .collect::<Vec<_>>(), + ), + Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| { + x.clone().$func.and_then(|x| Some($convert_func(x.data()))) + }) + .collect::<Vec<_>>(), + ), + _ => Some(vec![None; len]), + }, + _ => None, + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } + } + }; +} + +get_decimal_page_stats_iterator!( + MinDecimal128DataPageStatsIterator, + min, + i128, + from_bytes_to_i128 +); + +get_decimal_page_stats_iterator!( + MaxDecimal128DataPageStatsIterator, + max, + i128, + from_bytes_to_i128 +); + +get_decimal_page_stats_iterator!( + MinDecimal256DataPageStatsIterator, + min, + i256, + from_bytes_to_i256 +); + +get_decimal_page_stats_iterator!( + MaxDecimal256DataPageStatsIterator, + max, + i256, + from_bytes_to_i256 +); + +macro_rules! get_data_page_statistics { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + paste! { + match $data_type { + Some(DataType::Boolean) => { + let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator); + let mut builder = BooleanBuilder::new(); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, + Some(DataType::UInt8) => Ok(Arc::new( + UInt8Array::from_iter( + [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| u8::try_from(x).ok()) + }) + }) + .flatten() + ) + )), + Some(DataType::UInt16) => Ok(Arc::new( + UInt16Array::from_iter( + [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| u16::try_from(x).ok()) + }) + }) + .flatten() + ) + )), + Some(DataType::UInt32) => Ok(Arc::new( + UInt32Array::from_iter( + [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| Some(x as u32)) + }) + }) + .flatten() + ))), + Some(DataType::UInt64) => Ok(Arc::new( + UInt64Array::from_iter( + [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| Some(x as u64)) + }) + }) + .flatten() + ))), + Some(DataType::Int8) => Ok(Arc::new( + Int8Array::from_iter( + [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| i8::try_from(x).ok()) + }) + }) + .flatten() + ) + )), + Some(DataType::Int16) => Ok(Arc::new( + Int16Array::from_iter( + [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| i16::try_from(x).ok()) + }) + }) + .flatten() + ) + )), + Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Float16) => Ok(Arc::new( + Float16Array::from_iter( + [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().map(|x| { + x.and_then(|x| from_bytes_to_f16(x.data())) + }) + }) + .flatten() + ) + )), + Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Utf8) => { + let mut builder = StringBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, + Some(DataType::LargeUtf8) => { + let mut builder = LargeStringBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, + Some(DataType::Dictionary(_, value_type)) => { + [<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator) + }, + Some(DataType::Timestamp(unit, timezone)) => { + let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); + Ok(match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + }) + }, + Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Date64) => Ok( + Arc::new( + Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter() + .map(|x| { + x.and_then(|x| i64::try_from(x).ok()) + }) + .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000)) + }).flatten() + ) + ) + ), + Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new( + Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), + Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new( + Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), + Some(DataType::Time32(unit)) => { + Ok(match unit { + TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( + [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(), + )), + TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( + [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(), + )), + _ => { + // don't know how to extract statistics, so return an empty array + new_empty_array(&DataType::Time32(unit.clone())) + } + }) + } + Some(DataType::Time64(unit)) => { + Ok(match unit { + TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( + [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(), + )), + TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( + [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(), + )), + _ => { + // don't know how to extract statistics, so return an empty array + new_empty_array(&DataType::Time64(unit.clone())) + } + }) + }, + Some(DataType::FixedSizeBinary(size)) => { + let mut builder = FixedSizeBinaryBuilder::new(*size); + let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + if x.len() == *size as usize { + let _ = builder.append_value(x.data()); + } else { + // log::debug!( + // "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", + // size, + // x.len(), + // ); + builder.append_null(); + } + } + } + Ok(Arc::new(builder.finish())) + }, + _ => unimplemented!() + } + } + } +} + +/// Lookups up the parquet column by name +/// +/// Returns the parquet column index and the corresponding arrow field +pub fn parquet_column<'a>( + parquet_schema: &SchemaDescriptor, + arrow_schema: &'a Schema, + name: &str, +) -> Option<(usize, &'a FieldRef)> { + let (root_idx, field) = arrow_schema.fields.find(name)?; + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md> + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + return None; + } + + // This could be made more efficient (#TBD) + let parquet_idx = (0..parquet_schema.columns().len()) + .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; + Some((parquet_idx, field)) +} + +/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an +/// [`ArrayRef`] +/// +/// This is an internal helper -- see [`StatisticsConverter`] for public API +fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>( + data_type: &DataType, + iterator: I, +) -> Result<ArrayRef> { + get_statistics!(Min, data_type, iterator) +} + +/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] +/// +/// This is an internal helper -- see [`StatisticsConverter`] for public API +fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>( + data_type: &DataType, + iterator: I, +) -> Result<ArrayRef> { + get_statistics!(Max, data_type, iterator) +} + +/// Extracts the min statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn min_page_statistics<'a, I>( + data_type: Option<&DataType>, + iterator: I, +) -> Result<ArrayRef> +where + I: Iterator<Item = (usize, &'a Index)>, +{ + get_data_page_statistics!(Min, data_type, iterator) +} + +/// Extracts the max statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn max_page_statistics<'a, I>( + data_type: Option<&DataType>, + iterator: I, +) -> Result<ArrayRef> +where + I: Iterator<Item = (usize, &'a Index)>, +{ + get_data_page_statistics!(Max, data_type, iterator) +} + +/// Extracts the null count statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +/// +/// The returned Array is an [`UInt64Array`] +pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array> +where + I: Iterator<Item = (usize, &'a Index)>, +{ + let iter = iterator.flat_map(|(len, index)| match index { + Index::NONE => vec![None; len], + Index::BOOLEAN(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::<Vec<_>>(), + Index::INT32(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::<Vec<_>>(), + Index::INT64(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::<Vec<_>>(), + Index::FLOAT(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::<Vec<_>>(), + Index::DOUBLE(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::<Vec<_>>(), + Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::<Vec<_>>(), + Index::BYTE_ARRAY(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::<Vec<_>>(), + _ => unimplemented!(), + }); + + Ok(UInt64Array::from_iter(iter)) +} + +/// Extracts Parquet statistics as Arrow arrays +/// +/// This is used to convert Parquet statistics to Arrow arrays, with proper type +/// conversions. This information can be used for pruning parquet files or row +/// groups based on the statistics embedded in parquet files +/// +/// # Schemas +/// +/// The schema of the parquet file and the arrow schema are used to convert the +/// underlying statistics value (stored as a parquet value) into the +/// corresponding Arrow value. For example, Decimals are stored as binary in +/// parquet files. +/// Review Comment: As part of the port, I changed the visibility of `parquet_column` from `pub(crate)` to `pub` because the `pub(crate)` caused a failure in documentation tests. But I'm not sure this was the right way to resolve that issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
