alamb commented on code in PR #9303:
URL: https://github.com/apache/arrow-rs/pull/9303#discussion_r2746524798
##########
parquet/src/arrow/arrow_reader/statistics.rs:
##########
@@ -596,473 +600,572 @@ macro_rules! get_statistics {
}}}
}
-macro_rules! make_data_page_stats_iterator {
- ($iterator_type: ident, $func: ident, $stat_value_type: ty) => {
- struct $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- iter: I,
- }
-
- impl<'a, I> $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- fn new(iter: I) -> Self {
- Self { iter }
- }
- }
-
- impl<'a, I> Iterator for $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- type Item = Vec<Option<$stat_value_type>>;
-
- fn next(&mut self) -> Option<Self::Item> {
- let next = self.iter.next();
- match next {
- Some((len, index)) => match index {
- // No matching `Index` found;
- // thus no statistics that can be extracted.
- // We return vec![None; len] to effectively
- // create an arrow null-array with the length
- // corresponding to the number of entries in
- // `ParquetOffsetIndex` per row group per column.
- ColumnIndexMetaData::NONE => Some(vec![None; len]),
- _ =>
Some(<$stat_value_type>::$func(&index).collect::<Vec<_>>()),
- },
- _ => None,
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.iter.size_hint()
- }
- }
- };
-}
-
-make_data_page_stats_iterator!(MinBooleanDataPageStatsIterator,
min_values_iter, bool);
-make_data_page_stats_iterator!(MaxBooleanDataPageStatsIterator,
max_values_iter, bool);
-make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min_values_iter,
i32);
-make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max_values_iter,
i32);
-make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min_values_iter,
i64);
-make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max_values_iter,
i64);
-make_data_page_stats_iterator!(
- MinFloat16DataPageStatsIterator,
- min_values_iter,
- FixedLenByteArray
-);
-make_data_page_stats_iterator!(
- MaxFloat16DataPageStatsIterator,
- max_values_iter,
- FixedLenByteArray
-);
-make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator,
min_values_iter, f32);
-make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator,
max_values_iter, f32);
-make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator,
min_values_iter, f64);
-make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator,
max_values_iter, f64);
-make_data_page_stats_iterator!(
- MinByteArrayDataPageStatsIterator,
- min_values_iter,
- ByteArray
-);
-make_data_page_stats_iterator!(
- MaxByteArrayDataPageStatsIterator,
- max_values_iter,
- ByteArray
-);
-make_data_page_stats_iterator!(
- MaxFixedLenByteArrayDataPageStatsIterator,
- max_values_iter,
- FixedLenByteArray
-);
-
-make_data_page_stats_iterator!(
- MinFixedLenByteArrayDataPageStatsIterator,
- min_values_iter,
- FixedLenByteArray
-);
-
-macro_rules! get_decimal_page_stats_iterator {
- ($iterator_type: ident, $func: ident, $stat_value_type: ident,
$convert_func: ident) => {
- struct $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- iter: I,
- }
-
- impl<'a, I> $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- fn new(iter: I) -> Self {
- Self { iter }
- }
- }
-
- impl<'a, I> Iterator for $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- type Item = Vec<Option<$stat_value_type>>;
-
- // Some(native_index.$func().map(|v|
v.map($conv)).collect::<Vec<_>>())
- fn next(&mut self) -> Option<Self::Item> {
- let next = self.iter.next();
- match next {
- Some((len, index)) => match index {
- ColumnIndexMetaData::INT32(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $stat_value_type::from(*x)))
- .collect::<Vec<_>>(),
- ),
- ColumnIndexMetaData::INT64(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x|
$stat_value_type::try_from(*x).unwrap()))
- .collect::<Vec<_>>(),
- ),
- ColumnIndexMetaData::BYTE_ARRAY(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $convert_func(x)))
- .collect::<Vec<_>>(),
- ),
-
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $convert_func(x)))
- .collect::<Vec<_>>(),
- ),
- _ => Some(vec![None; len]),
- },
- _ => None,
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.iter.size_hint()
- }
- }
- };
-}
-
-get_decimal_page_stats_iterator!(
- MinDecimal32DataPageStatsIterator,
- min_values_iter,
- i32,
- from_bytes_to_i32
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal32DataPageStatsIterator,
- max_values_iter,
- i32,
- from_bytes_to_i32
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal64DataPageStatsIterator,
- min_values_iter,
- i64,
- from_bytes_to_i64
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal64DataPageStatsIterator,
- max_values_iter,
- i64,
- from_bytes_to_i64
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal128DataPageStatsIterator,
- min_values_iter,
- i128,
- from_bytes_to_i128
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal128DataPageStatsIterator,
- max_values_iter,
- i128,
- from_bytes_to_i128
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal256DataPageStatsIterator,
- min_values_iter,
- i256,
- from_bytes_to_i256
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal256DataPageStatsIterator,
- max_values_iter,
- i256,
- from_bytes_to_i256
-);
-
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident,
$physical_type: ident) => {
- paste! {
- match $data_type {
+ {
+ let chunks: Vec<(usize, &ColumnIndexMetaData)> =
$iterator.collect();
+ let capacity: usize = chunks.iter().map(|c| c.0).sum();
+ paste! {
+ match $data_type {
DataType::Boolean => {
- let iterator = [<$stat_type_prefix
BooleanDataPageStatsIterator>]::new($iterator);
- let mut builder = BooleanBuilder::new();
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
- builder.append_value(x);
+ let mut b = BooleanBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BOOLEAN(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.copied());
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt8 => {
+ let mut b = UInt8Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.and_then(|&x|
u8::try_from(x).ok()));
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
Review Comment:
Yeah, I agree the win is likely pretty small, but every little allocation
helps
FWIW we could probably make it even better by not decoding into the
ParquetMetadata structures at all (and directly decode Thrift to arrow arrays 🤔
)
##########
parquet/src/file/page_index/column_index.rs:
##########
@@ -191,10 +191,17 @@ impl<T> PrimitiveColumnIndex<T> {
}
})
}
+ /// Returns the null pages.
+ ///
+ /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`.
Review Comment:
I don't understand this comment -- this API returns a vector of what pages
are nulls. What is `None`? Or is it trying to say calling `Self::min_value` and
`Self::max_value` will return `None` when `is_null_page` is true?
##########
parquet/src/arrow/arrow_reader/statistics.rs:
##########
@@ -596,473 +600,635 @@ macro_rules! get_statistics {
}}}
}
-macro_rules! make_data_page_stats_iterator {
- ($iterator_type: ident, $func: ident, $stat_value_type: ty) => {
- struct $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- iter: I,
- }
-
- impl<'a, I> $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- fn new(iter: I) -> Self {
- Self { iter }
- }
- }
-
- impl<'a, I> Iterator for $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- type Item = Vec<Option<$stat_value_type>>;
-
- fn next(&mut self) -> Option<Self::Item> {
- let next = self.iter.next();
- match next {
- Some((len, index)) => match index {
- // No matching `Index` found;
- // thus no statistics that can be extracted.
- // We return vec![None; len] to effectively
- // create an arrow null-array with the length
- // corresponding to the number of entries in
- // `ParquetOffsetIndex` per row group per column.
- ColumnIndexMetaData::NONE => Some(vec![None; len]),
- _ =>
Some(<$stat_value_type>::$func(&index).collect::<Vec<_>>()),
- },
- _ => None,
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.iter.size_hint()
- }
- }
- };
-}
-
-make_data_page_stats_iterator!(MinBooleanDataPageStatsIterator,
min_values_iter, bool);
-make_data_page_stats_iterator!(MaxBooleanDataPageStatsIterator,
max_values_iter, bool);
-make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min_values_iter,
i32);
-make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max_values_iter,
i32);
-make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min_values_iter,
i64);
-make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max_values_iter,
i64);
-make_data_page_stats_iterator!(
- MinFloat16DataPageStatsIterator,
- min_values_iter,
- FixedLenByteArray
-);
-make_data_page_stats_iterator!(
- MaxFloat16DataPageStatsIterator,
- max_values_iter,
- FixedLenByteArray
-);
-make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator,
min_values_iter, f32);
-make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator,
max_values_iter, f32);
-make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator,
min_values_iter, f64);
-make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator,
max_values_iter, f64);
-make_data_page_stats_iterator!(
- MinByteArrayDataPageStatsIterator,
- min_values_iter,
- ByteArray
-);
-make_data_page_stats_iterator!(
- MaxByteArrayDataPageStatsIterator,
- max_values_iter,
- ByteArray
-);
-make_data_page_stats_iterator!(
- MaxFixedLenByteArrayDataPageStatsIterator,
- max_values_iter,
- FixedLenByteArray
-);
-
-make_data_page_stats_iterator!(
- MinFixedLenByteArrayDataPageStatsIterator,
- min_values_iter,
- FixedLenByteArray
-);
-
-macro_rules! get_decimal_page_stats_iterator {
- ($iterator_type: ident, $func: ident, $stat_value_type: ident,
$convert_func: ident) => {
- struct $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- iter: I,
- }
-
- impl<'a, I> $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- fn new(iter: I) -> Self {
- Self { iter }
- }
- }
-
- impl<'a, I> Iterator for $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- type Item = Vec<Option<$stat_value_type>>;
-
- // Some(native_index.$func().map(|v|
v.map($conv)).collect::<Vec<_>>())
- fn next(&mut self) -> Option<Self::Item> {
- let next = self.iter.next();
- match next {
- Some((len, index)) => match index {
- ColumnIndexMetaData::INT32(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $stat_value_type::from(*x)))
- .collect::<Vec<_>>(),
- ),
- ColumnIndexMetaData::INT64(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x|
$stat_value_type::try_from(*x).unwrap()))
- .collect::<Vec<_>>(),
- ),
- ColumnIndexMetaData::BYTE_ARRAY(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $convert_func(x)))
- .collect::<Vec<_>>(),
- ),
-
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $convert_func(x)))
- .collect::<Vec<_>>(),
- ),
- _ => Some(vec![None; len]),
- },
- _ => None,
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.iter.size_hint()
- }
- }
- };
-}
-
-get_decimal_page_stats_iterator!(
- MinDecimal32DataPageStatsIterator,
- min_values_iter,
- i32,
- from_bytes_to_i32
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal32DataPageStatsIterator,
- max_values_iter,
- i32,
- from_bytes_to_i32
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal64DataPageStatsIterator,
- min_values_iter,
- i64,
- from_bytes_to_i64
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal64DataPageStatsIterator,
- max_values_iter,
- i64,
- from_bytes_to_i64
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal128DataPageStatsIterator,
- min_values_iter,
- i128,
- from_bytes_to_i128
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal128DataPageStatsIterator,
- max_values_iter,
- i128,
- from_bytes_to_i128
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal256DataPageStatsIterator,
- min_values_iter,
- i256,
- from_bytes_to_i256
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal256DataPageStatsIterator,
- max_values_iter,
- i256,
- from_bytes_to_i256
-);
-
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident,
$physical_type: ident) => {
- paste! {
- match $data_type {
+ {
+ let chunks: Vec<(usize, &ColumnIndexMetaData)> =
$iterator.collect();
+ let capacity: usize = chunks.iter().map(|c| c.0).sum();
+ paste! {
+ match $data_type {
DataType::Boolean => {
- let iterator = [<$stat_type_prefix
BooleanDataPageStatsIterator>]::new($iterator);
- let mut builder = BooleanBuilder::new();
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
- builder.append_value(x);
+ let mut b = BooleanBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BOOLEAN(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.copied());
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt8 => {
+ let mut b = UInt8Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
u8::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt16 => {
+ let mut b = UInt16Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
u16::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt32 => {
+ let mut b = UInt32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|&x| x as u32)),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt64 => {
+ let mut b = UInt64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|&x| x as u64)),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int8 => {
+ let mut b = Int8Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
i8::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int16 => {
+ let mut b = Int16Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
i16::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int32 => {
+ let mut b = Int32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int64 => {
+ let mut b = Int64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Float16 => {
+ let mut b = Float16Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|x|
from_bytes_to_f16(x))),
+ );
+ }
+ _ => b.append_nulls(len),
}
}
- Ok(Arc::new(builder.finish()))
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Float32 => {
+ let mut b = Float32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::FLOAT(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Float64 => {
+ let mut b = Float64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::DOUBLE(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Binary => {
+ let mut b = BinaryBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.map(|x| x.as_ref()));
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::LargeBinary => {
+ let mut b = LargeBinaryBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.map(|x| x.as_ref()));
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
},
- DataType::UInt8 => Ok(Arc::new(
- UInt8Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| u8::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::UInt16 => Ok(Arc::new(
- UInt16Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| u16::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::UInt32 => Ok(Arc::new(
- UInt32Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| Some(x as u32))
- })
- })
- .flatten()
- ))),
- DataType::UInt64 => Ok(Arc::new(
- UInt64Array::from_iter(
- [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| Some(x as u64))
- })
- })
- .flatten()
- ))),
- DataType::Int8 => Ok(Arc::new(
- Int8Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| i8::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::Int16 => Ok(Arc::new(
- Int16Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| i16::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::Int32 =>
Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Int64 =>
Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Float16 => Ok(Arc::new(
- Float16Array::from_iter(
- [<$stat_type_prefix
Float16DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| from_bytes_to_f16(x.data()))
- })
- })
- .flatten()
- )
- )),
- DataType::Float32 =>
Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix
Float32DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Float64 =>
Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix
Float64DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Binary =>
Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::LargeBinary =>
Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Utf8 => {
- let mut builder = StringBuilder::new();
- let iterator = [<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator);
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
-
- let Ok(x) = std::str::from_utf8(x.data()) else {
- builder.append_null();
- continue;
- };
-
- builder.append_value(x);
+ let mut b = StringBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ match val {
+ Some(x) => match
std::str::from_utf8(x.as_ref()) {
+ Ok(s) => b.append_value(s),
+ _ => b.append_null(),
+ }
+ None => b.append_null(),
+ }
+ }
+ }
+ _ => b.append_nulls(len),
}
}
- Ok(Arc::new(builder.finish()))
+ Ok(Arc::new(b.finish()))
},
DataType::LargeUtf8 => {
- let mut builder = LargeStringBuilder::new();
- let iterator = [<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator);
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
-
- let Ok(x) = std::str::from_utf8(x.data()) else {
- builder.append_null();
- continue;
- };
-
- builder.append_value(x);
+ let mut b = LargeStringBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ match val {
+ Some(x) => match
std::str::from_utf8(x.as_ref()) {
+ Ok(s) => b.append_value(s),
+ _ => b.append_null(),
+ }
+ None => b.append_null(),
+ }
+ }
+ }
+ _ => b.append_nulls(len),
}
}
- Ok(Arc::new(builder.finish()))
+ Ok(Arc::new(b.finish()))
},
DataType::Dictionary(_, value_type) => {
- [<$stat_type_prefix:lower _ page_statistics>](value_type,
$iterator, $physical_type)
+ [<$stat_type_prefix:lower _ page_statistics>](value_type,
chunks.into_iter(), $physical_type)
},
DataType::Timestamp(unit, timezone) => {
- let iter = [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten();
- Ok(match unit {
- TimeUnit::Second =>
Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- TimeUnit::Millisecond =>
Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- TimeUnit::Microsecond =>
Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- TimeUnit::Nanosecond =>
Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- })
+ match unit {
+ TimeUnit::Second => {
+ let mut b =
TimestampSecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ TimeUnit::Millisecond => {
+ let mut b =
TimestampMillisecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ TimeUnit::Microsecond => {
+ let mut b =
TimestampMicrosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ TimeUnit::Nanosecond => {
+ let mut b =
TimestampNanosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ }
+ },
+ DataType::Date32 => {
+ let mut b = Date32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Date64 if $physical_type ==
Some(PhysicalType::INT32)=> {
+ let mut b = Date64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|&x| (x as i64) *
24 * 60 * 60 * 1000)),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Date64 if $physical_type ==
Some(PhysicalType::INT64) => {
+ let mut b = Date64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Decimal32(precision, scale) => {
+ let mut b = Decimal32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
i32::try_from(x).ok())),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i32(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i32(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
+ },
+ DataType::Decimal64(precision, scale) => {
+ let mut b = Decimal64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x| *x as i64)),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i64(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i64(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
+ },
+ DataType::Decimal128(precision, scale) => {
+ let mut b = Decimal128Array::builder(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x| *x as i128)),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x| *x as i128)),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i128(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i128(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
+ },
+ DataType::Decimal256(precision, scale) => {
+ let mut b = Decimal256Array::builder(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
i256::from_i128(*x as i128))),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
i256::from_i128(*x as i128))),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i256(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i256(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
},
- DataType::Date32 =>
Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Date64 if $physical_type ==
Some(PhysicalType::INT32)=> Ok(
- Arc::new(
- Date64Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter()
- .map(|x| {
- x.and_then(|x| i64::try_from(x).ok())
- })
- .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
- }).flatten()
- )
- )
- ),
- DataType::Date64 if $physical_type ==
Some(PhysicalType::INT64) =>
Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Decimal32(precision, scale) => Ok(Arc::new(
- Decimal32Array::from_iter([<$stat_type_prefix
Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
- DataType::Decimal64(precision, scale) => Ok(Arc::new(
- Decimal64Array::from_iter([<$stat_type_prefix
Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
- DataType::Decimal128(precision, scale) => Ok(Arc::new(
- Decimal128Array::from_iter([<$stat_type_prefix
Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
- DataType::Decimal256(precision, scale) => Ok(Arc::new(
- Decimal256Array::from_iter([<$stat_type_prefix
Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
DataType::Time32(unit) => {
- Ok(match unit {
- TimeUnit::Second =>
Arc::new(Time32SecondArray::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten(),
- )),
- TimeUnit::Millisecond =>
Arc::new(Time32MillisecondArray::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten(),
- )),
+ match unit {
+ TimeUnit::Second => {
+ let mut b =
Time32SecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ TimeUnit::Millisecond => {
+ let mut b =
Time32MillisecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
_ => {
- // don't know how to extract statistics, so return
an empty array
- new_empty_array(&DataType::Time32(unit.clone()))
+ Ok(new_null_array($data_type, capacity))
}
- })
+ }
}
DataType::Time64(unit) => {
- Ok(match unit {
- TimeUnit::Microsecond =>
Arc::new(Time64MicrosecondArray::from_iter(
- [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten(),
- )),
- TimeUnit::Nanosecond =>
Arc::new(Time64NanosecondArray::from_iter(
- [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten(),
- )),
+ match unit {
+ TimeUnit::Microsecond => {
+ let mut b =
Time64MicrosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ TimeUnit::Nanosecond => {
+ let mut b =
Time64NanosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
_ => {
- // don't know how to extract statistics, so return
an empty array
- new_empty_array(&DataType::Time64(unit.clone()))
+ Ok(new_null_array($data_type, capacity))
}
- })
+ }
},
DataType::FixedSizeBinary(size) => {
- let mut builder = FixedSizeBinaryBuilder::new(*size);
- let iterator = [<$stat_type_prefix
FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
-
- if x.len() == *size as usize {
Review Comment:
The previous code for FixedSizeBinary tried to append a valid and ignored
the error, but the new code now will return the error -- I left a suggestion of
how to restore the old behavior
##########
parquet/src/arrow/arrow_reader/statistics.rs:
##########
@@ -596,473 +600,635 @@ macro_rules! get_statistics {
}}}
}
-macro_rules! make_data_page_stats_iterator {
- ($iterator_type: ident, $func: ident, $stat_value_type: ty) => {
- struct $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- iter: I,
- }
-
- impl<'a, I> $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- fn new(iter: I) -> Self {
- Self { iter }
- }
- }
-
- impl<'a, I> Iterator for $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- type Item = Vec<Option<$stat_value_type>>;
-
- fn next(&mut self) -> Option<Self::Item> {
- let next = self.iter.next();
- match next {
- Some((len, index)) => match index {
- // No matching `Index` found;
- // thus no statistics that can be extracted.
- // We return vec![None; len] to effectively
- // create an arrow null-array with the length
- // corresponding to the number of entries in
- // `ParquetOffsetIndex` per row group per column.
- ColumnIndexMetaData::NONE => Some(vec![None; len]),
- _ =>
Some(<$stat_value_type>::$func(&index).collect::<Vec<_>>()),
- },
- _ => None,
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.iter.size_hint()
- }
- }
- };
-}
-
-make_data_page_stats_iterator!(MinBooleanDataPageStatsIterator,
min_values_iter, bool);
-make_data_page_stats_iterator!(MaxBooleanDataPageStatsIterator,
max_values_iter, bool);
-make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min_values_iter,
i32);
-make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max_values_iter,
i32);
-make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min_values_iter,
i64);
-make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max_values_iter,
i64);
-make_data_page_stats_iterator!(
- MinFloat16DataPageStatsIterator,
- min_values_iter,
- FixedLenByteArray
-);
-make_data_page_stats_iterator!(
- MaxFloat16DataPageStatsIterator,
- max_values_iter,
- FixedLenByteArray
-);
-make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator,
min_values_iter, f32);
-make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator,
max_values_iter, f32);
-make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator,
min_values_iter, f64);
-make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator,
max_values_iter, f64);
-make_data_page_stats_iterator!(
- MinByteArrayDataPageStatsIterator,
- min_values_iter,
- ByteArray
-);
-make_data_page_stats_iterator!(
- MaxByteArrayDataPageStatsIterator,
- max_values_iter,
- ByteArray
-);
-make_data_page_stats_iterator!(
- MaxFixedLenByteArrayDataPageStatsIterator,
- max_values_iter,
- FixedLenByteArray
-);
-
-make_data_page_stats_iterator!(
- MinFixedLenByteArrayDataPageStatsIterator,
- min_values_iter,
- FixedLenByteArray
-);
-
-macro_rules! get_decimal_page_stats_iterator {
- ($iterator_type: ident, $func: ident, $stat_value_type: ident,
$convert_func: ident) => {
- struct $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- iter: I,
- }
-
- impl<'a, I> $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- fn new(iter: I) -> Self {
- Self { iter }
- }
- }
-
- impl<'a, I> Iterator for $iterator_type<'a, I>
- where
- I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
- {
- type Item = Vec<Option<$stat_value_type>>;
-
- // Some(native_index.$func().map(|v|
v.map($conv)).collect::<Vec<_>>())
- fn next(&mut self) -> Option<Self::Item> {
- let next = self.iter.next();
- match next {
- Some((len, index)) => match index {
- ColumnIndexMetaData::INT32(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $stat_value_type::from(*x)))
- .collect::<Vec<_>>(),
- ),
- ColumnIndexMetaData::INT64(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x|
$stat_value_type::try_from(*x).unwrap()))
- .collect::<Vec<_>>(),
- ),
- ColumnIndexMetaData::BYTE_ARRAY(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $convert_func(x)))
- .collect::<Vec<_>>(),
- ),
-
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
- native_index
- .$func()
- .map(|x| x.map(|x| $convert_func(x)))
- .collect::<Vec<_>>(),
- ),
- _ => Some(vec![None; len]),
- },
- _ => None,
- }
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.iter.size_hint()
- }
- }
- };
-}
-
-get_decimal_page_stats_iterator!(
- MinDecimal32DataPageStatsIterator,
- min_values_iter,
- i32,
- from_bytes_to_i32
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal32DataPageStatsIterator,
- max_values_iter,
- i32,
- from_bytes_to_i32
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal64DataPageStatsIterator,
- min_values_iter,
- i64,
- from_bytes_to_i64
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal64DataPageStatsIterator,
- max_values_iter,
- i64,
- from_bytes_to_i64
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal128DataPageStatsIterator,
- min_values_iter,
- i128,
- from_bytes_to_i128
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal128DataPageStatsIterator,
- max_values_iter,
- i128,
- from_bytes_to_i128
-);
-
-get_decimal_page_stats_iterator!(
- MinDecimal256DataPageStatsIterator,
- min_values_iter,
- i256,
- from_bytes_to_i256
-);
-
-get_decimal_page_stats_iterator!(
- MaxDecimal256DataPageStatsIterator,
- max_values_iter,
- i256,
- from_bytes_to_i256
-);
-
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident,
$physical_type: ident) => {
- paste! {
- match $data_type {
+ {
+ let chunks: Vec<(usize, &ColumnIndexMetaData)> =
$iterator.collect();
+ let capacity: usize = chunks.iter().map(|c| c.0).sum();
+ paste! {
+ match $data_type {
DataType::Boolean => {
- let iterator = [<$stat_type_prefix
BooleanDataPageStatsIterator>]::new($iterator);
- let mut builder = BooleanBuilder::new();
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
- builder.append_value(x);
+ let mut b = BooleanBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BOOLEAN(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.copied());
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt8 => {
+ let mut b = UInt8Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
u8::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt16 => {
+ let mut b = UInt16Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
u16::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt32 => {
+ let mut b = UInt32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|&x| x as u32)),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::UInt64 => {
+ let mut b = UInt64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|&x| x as u64)),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int8 => {
+ let mut b = Int8Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
i8::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int16 => {
+ let mut b = Int16Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
i16::try_from(x).ok())),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int32 => {
+ let mut b = Int32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Int64 => {
+ let mut b = Int64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Float16 => {
+ let mut b = Float16Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|x|
from_bytes_to_f16(x))),
+ );
+ }
+ _ => b.append_nulls(len),
}
}
- Ok(Arc::new(builder.finish()))
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Float32 => {
+ let mut b = Float32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::FLOAT(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Float64 => {
+ let mut b = Float64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::DOUBLE(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Binary => {
+ let mut b = BinaryBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.map(|x| x.as_ref()));
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::LargeBinary => {
+ let mut b = LargeBinaryBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ b.append_option(val.map(|x| x.as_ref()));
+ }
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
},
- DataType::UInt8 => Ok(Arc::new(
- UInt8Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| u8::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::UInt16 => Ok(Arc::new(
- UInt16Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| u16::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::UInt32 => Ok(Arc::new(
- UInt32Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| Some(x as u32))
- })
- })
- .flatten()
- ))),
- DataType::UInt64 => Ok(Arc::new(
- UInt64Array::from_iter(
- [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| Some(x as u64))
- })
- })
- .flatten()
- ))),
- DataType::Int8 => Ok(Arc::new(
- Int8Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| i8::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::Int16 => Ok(Arc::new(
- Int16Array::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| i16::try_from(x).ok())
- })
- })
- .flatten()
- )
- )),
- DataType::Int32 =>
Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Int64 =>
Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Float16 => Ok(Arc::new(
- Float16Array::from_iter(
- [<$stat_type_prefix
Float16DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter().map(|x| {
- x.and_then(|x| from_bytes_to_f16(x.data()))
- })
- })
- .flatten()
- )
- )),
- DataType::Float32 =>
Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix
Float32DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Float64 =>
Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix
Float64DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Binary =>
Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::LargeBinary =>
Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Utf8 => {
- let mut builder = StringBuilder::new();
- let iterator = [<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator);
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
-
- let Ok(x) = std::str::from_utf8(x.data()) else {
- builder.append_null();
- continue;
- };
-
- builder.append_value(x);
+ let mut b = StringBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ match val {
+ Some(x) => match
std::str::from_utf8(x.as_ref()) {
+ Ok(s) => b.append_value(s),
+ _ => b.append_null(),
+ }
+ None => b.append_null(),
+ }
+ }
+ }
+ _ => b.append_nulls(len),
}
}
- Ok(Arc::new(builder.finish()))
+ Ok(Arc::new(b.finish()))
},
DataType::LargeUtf8 => {
- let mut builder = LargeStringBuilder::new();
- let iterator = [<$stat_type_prefix
ByteArrayDataPageStatsIterator>]::new($iterator);
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
-
- let Ok(x) = std::str::from_utf8(x.data()) else {
- builder.append_null();
- continue;
- };
-
- builder.append_value(x);
+ let mut b = LargeStringBuilder::with_capacity(capacity,
capacity * 10);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ match val {
+ Some(x) => match
std::str::from_utf8(x.as_ref()) {
+ Ok(s) => b.append_value(s),
+ _ => b.append_null(),
+ }
+ None => b.append_null(),
+ }
+ }
+ }
+ _ => b.append_nulls(len),
}
}
- Ok(Arc::new(builder.finish()))
+ Ok(Arc::new(b.finish()))
},
DataType::Dictionary(_, value_type) => {
- [<$stat_type_prefix:lower _ page_statistics>](value_type,
$iterator, $physical_type)
+ [<$stat_type_prefix:lower _ page_statistics>](value_type,
chunks.into_iter(), $physical_type)
},
DataType::Timestamp(unit, timezone) => {
- let iter = [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten();
- Ok(match unit {
- TimeUnit::Second =>
Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- TimeUnit::Millisecond =>
Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- TimeUnit::Microsecond =>
Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- TimeUnit::Nanosecond =>
Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
- })
+ match unit {
+ TimeUnit::Second => {
+ let mut b =
TimestampSecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ TimeUnit::Millisecond => {
+ let mut b =
TimestampMillisecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ TimeUnit::Microsecond => {
+ let mut b =
TimestampMicrosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ TimeUnit::Nanosecond => {
+ let mut b =
TimestampNanosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
+ }
+ }
+ },
+ DataType::Date32 => {
+ let mut b = Date32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Date64 if $physical_type ==
Some(PhysicalType::INT32)=> {
+ let mut b = Date64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|&x| (x as i64) *
24 * 60 * 60 * 1000)),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Date64 if $physical_type ==
Some(PhysicalType::INT64) => {
+ let mut b = Date64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ },
+ DataType::Decimal32(precision, scale) => {
+ let mut b = Decimal32Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.and_then(|&x|
i32::try_from(x).ok())),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i32(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i32(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
+ },
+ DataType::Decimal64(precision, scale) => {
+ let mut b = Decimal64Builder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x| *x as i64)),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i64(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i64(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
+ },
+ DataType::Decimal128(precision, scale) => {
+ let mut b = Decimal128Array::builder(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x| *x as i128)),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x| *x as i128)),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i128(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i128(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
+ },
+ DataType::Decimal256(precision, scale) => {
+ let mut b = Decimal256Array::builder(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
i256::from_i128(*x as i128))),
+ );
+ }
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
i256::from_i128(*x as i128))),
+ );
+ }
+ ColumnIndexMetaData::BYTE_ARRAY(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i256(x.as_ref()))),
+ );
+ }
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.map(|x|
from_bytes_to_i256(x.as_ref()))),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.with_precision_and_scale(*precision,
*scale)?.finish()))
},
- DataType::Date32 =>
Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Date64 if $physical_type ==
Some(PhysicalType::INT32)=> Ok(
- Arc::new(
- Date64Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
- .map(|x| {
- x.into_iter()
- .map(|x| {
- x.and_then(|x| i64::try_from(x).ok())
- })
- .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
- }).flatten()
- )
- )
- ),
- DataType::Date64 if $physical_type ==
Some(PhysicalType::INT64) =>
Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Decimal32(precision, scale) => Ok(Arc::new(
- Decimal32Array::from_iter([<$stat_type_prefix
Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
- DataType::Decimal64(precision, scale) => Ok(Arc::new(
- Decimal64Array::from_iter([<$stat_type_prefix
Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
- DataType::Decimal128(precision, scale) => Ok(Arc::new(
- Decimal128Array::from_iter([<$stat_type_prefix
Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
- DataType::Decimal256(precision, scale) => Ok(Arc::new(
- Decimal256Array::from_iter([<$stat_type_prefix
Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
DataType::Time32(unit) => {
- Ok(match unit {
- TimeUnit::Second =>
Arc::new(Time32SecondArray::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten(),
- )),
- TimeUnit::Millisecond =>
Arc::new(Time32MillisecondArray::from_iter(
- [<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten(),
- )),
+ match unit {
+ TimeUnit::Second => {
+ let mut b =
Time32SecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ TimeUnit::Millisecond => {
+ let mut b =
Time32MillisecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT32(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
_ => {
- // don't know how to extract statistics, so return
an empty array
- new_empty_array(&DataType::Time32(unit.clone()))
+ Ok(new_null_array($data_type, capacity))
}
- })
+ }
}
DataType::Time64(unit) => {
- Ok(match unit {
- TimeUnit::Microsecond =>
Arc::new(Time64MicrosecondArray::from_iter(
- [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten(),
- )),
- TimeUnit::Nanosecond =>
Arc::new(Time64NanosecondArray::from_iter(
- [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten(),
- )),
+ match unit {
+ TimeUnit::Microsecond => {
+ let mut b =
Time64MicrosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ TimeUnit::Nanosecond => {
+ let mut b =
Time64NanosecondBuilder::with_capacity(capacity);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::INT64(index) => {
+ b.extend_from_iter_option(
+ index.[<$stat_type_prefix:lower
_values_iter>]()
+ .map(|val| val.copied()),
+ );
+ }
+ _ => b.append_nulls(len),
+ }
+ }
+ Ok(Arc::new(b.finish()))
+ }
_ => {
- // don't know how to extract statistics, so return
an empty array
- new_empty_array(&DataType::Time64(unit.clone()))
+ Ok(new_null_array($data_type, capacity))
}
- })
+ }
},
DataType::FixedSizeBinary(size) => {
- let mut builder = FixedSizeBinaryBuilder::new(*size);
- let iterator = [<$stat_type_prefix
FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
- for x in iterator {
- for x in x.into_iter() {
- let Some(x) = x else {
- builder.append_null(); // no statistics value
- continue;
- };
-
- if x.len() == *size as usize {
- let _ = builder.append_value(x.data());
- } else {
- builder.append_null();
+ let mut b =
FixedSizeBinaryBuilder::with_capacity(capacity, *size);
+ for (len, index) in chunks {
+ match index {
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
=> {
+ for val in index.[<$stat_type_prefix:lower
_values_iter>]() {
+ match val {
+ Some(v) => b.append_value(v.as_ref())?,
Review Comment:
I think this would be more similar to the old code (and would be more
tolerant of malformed stats)
I am not sure what I think about just ignoring the error (would that leave
the builder in a bad state / the rows offset?) I personally think it would be
safer to maintain the old behavior
```suggestion
Some(v) => {
if v.len() == *size as usize {
let _ =
b.append_value(v.as_ref())?;
} else {
b.append_null();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]