This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9845e6eb58 Avoid the usage of intermediate ScalarValue to improve
performance of extracting statistics from parquet files (#10711)
9845e6eb58 is described below
commit 9845e6eb58d285e965fa1b1b8788d6d69f9fc005
Author: Xin Li <[email protected]>
AuthorDate: Wed Jun 5 22:48:43 2024 +0800
Avoid the usage of intermediate ScalarValue to improve performance of
extracting statistics from parquet files (#10711)
* Fix incorrect statistics read for unsigned integers columns in parquet
* Staging the change for faster stat
* Improve performance of extracting statistics from parquet files
* Revert "Improve performance of extracting statistics from parquet files"
This reverts commit 2faec57e0fc89bf6d90c069a54daa7c991f43d09.
* Revert "Staging the change for faster stat"
This reverts commit 095ac391e17f9d4c3d841971316fb47dc371b791.
* Refine using the iterator idea
* Add the rest types
* Consolidate Decimal statistics extraction
* clippy
* Simplify
* Fix dictionary type
* Fix incorrect statistics read for timestamp columns in parquet
* Add exhaustive match
* Update latest datatypes
* fix bad comment
* Remove duplications using paste
* Fix comment
* Update Cargo.lock
* fix docs
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-cli/Cargo.lock | 37 +-
datafusion/core/Cargo.toml | 1 +
.../datasource/physical_plan/parquet/statistics.rs | 646 ++++++++++++++-------
datafusion/core/tests/parquet/arrow_statistics.rs | 1 -
4 files changed, 463 insertions(+), 222 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 3040586501..b165070c60 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -942,7 +942,7 @@ version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008"
dependencies = [
- "heck",
+ "heck 0.4.1",
"proc-macro-error",
"proc-macro2",
"quote",
@@ -976,7 +976,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
dependencies = [
"strum 0.26.2",
- "strum_macros 0.26.2",
+ "strum_macros 0.26.4",
"unicode-width",
]
@@ -1156,6 +1156,7 @@ dependencies = [
"object_store",
"parking_lot",
"parquet",
+ "paste",
"pin-project-lite",
"rand",
"sqlparser",
@@ -1255,7 +1256,7 @@ dependencies = [
"serde_json",
"sqlparser",
"strum 0.26.2",
- "strum_macros 0.26.2",
+ "strum_macros 0.26.4",
]
[[package]]
@@ -1809,6 +1810,12 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
+[[package]]
+name = "heck"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
+
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1881,9 +1888,9 @@ checksum =
"9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
-version = "0.14.28"
+version = "0.14.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80"
+checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33"
dependencies = [
"bytes",
"futures-channel",
@@ -2683,9 +2690,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.84"
+version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec96c6a92621310b51366f1e28d05ef11489516e93be030060e5fc12024a49d6"
+checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23"
dependencies = [
"unicode-ident",
]
@@ -3218,7 +3225,7 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf"
dependencies = [
- "heck",
+ "heck 0.4.1",
"proc-macro2",
"quote",
"syn 1.0.109",
@@ -3303,7 +3310,7 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29"
dependencies = [
- "strum_macros 0.26.2",
+ "strum_macros 0.26.4",
]
[[package]]
@@ -3312,7 +3319,7 @@ version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0"
dependencies = [
- "heck",
+ "heck 0.4.1",
"proc-macro2",
"quote",
"rustversion",
@@ -3321,11 +3328,11 @@ dependencies = [
[[package]]
name = "strum_macros"
-version = "0.26.2"
+version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946"
+checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
- "heck",
+ "heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
@@ -3711,9 +3718,9 @@ checksum =
"d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
[[package]]
name = "unicode-width"
-version = "0.1.12"
+version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6"
+checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
[[package]]
name = "untrusted"
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 9f1f748435..3946758ff9 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -121,6 +121,7 @@ num_cpus = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true, optional = true, default-features = true }
+paste = "1.0.15"
pin-project-lite = "^0.2.7"
rand = { workspace = true }
sqlparser = { workspace = true }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 8d0d30bf41..a73538d02a 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -19,17 +19,26 @@
// TODO: potentially move this to arrow-rs:
https://github.com/apache/arrow-rs/issues/4328
-use arrow::{array::ArrayRef, datatypes::i256, datatypes::DataType,
datatypes::TimeUnit};
-use arrow_array::{new_empty_array, new_null_array, UInt64Array};
-use arrow_schema::{Field, FieldRef, Schema};
-use datafusion_common::{
- internal_datafusion_err, internal_err, plan_err, Result, ScalarValue,
+use arrow::datatypes::i256;
+use arrow::{array::ArrayRef, datatypes::DataType};
+use arrow_array::{
+ new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array,
+ Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array,
+ Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray,
+ StringArray, Time32MillisecondArray, Time32SecondArray,
Time64MicrosecondArray,
+ Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray,
+ TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
+ UInt64Array, UInt8Array,
};
+use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
+use datafusion_common::{internal_datafusion_err, internal_err, plan_err,
Result};
use half::f16;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
+use paste::paste;
use std::sync::Arc;
+
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
@@ -66,201 +75,446 @@ pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8;
N] {
result
}
-/// Extract a single min/max statistics from a [`ParquetStatistics`] object
+/// Define an adapter iterator for extracting statistics from an iterator of
+/// `ParquetStatistics`
+///
+///
+/// Handles checking if the statistics are present and valid with the correct
type.
///
-/// * `$column_statistics` is the `ParquetStatistics` object
-/// * `$func is the function` (`min`/`max`) to call to get the value
-/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get
the value as bytes
-/// * `$target_arrow_type` is the [`DataType`] of the target statistics
-macro_rules! get_statistic {
- ($column_statistics:expr, $func:ident, $bytes_func:ident,
$target_arrow_type:expr) => {{
- if !$column_statistics.has_min_max_set() {
- return None;
+/// Parameters:
+/// * `$iterator_type` is the name of the iterator type (e.g.
`MinBooleanStatsIterator`)
+/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
+/// * `$parquet_statistics_type` is the type of the statistics (e.g.
`ParquetStatistics::Boolean`)
+/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
+macro_rules! make_stats_iterator {
+ ($iterator_type:ident, $func:ident, $parquet_statistics_type:path,
$stat_value_type:ty) => {
+ /// Maps an iterator of `ParquetStatistics` into an iterator of
+ /// `&$stat_value_type``
+ ///
+ /// Yielded elements:
+ /// * Some(stats) if valid
+ /// * None if the statistics are not present, not valid, or not
$stat_value_type
+ struct $iterator_type<'a, I>
+ where
+ I: Iterator<Item = Option<&'a ParquetStatistics>>,
+ {
+ iter: I,
}
- match $column_statistics {
- ParquetStatistics::Boolean(s) =>
Some(ScalarValue::Boolean(Some(*s.$func()))),
- ParquetStatistics::Int32(s) => {
- match $target_arrow_type {
- // int32 to decimal with the precision and scale
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(*s.$func() as i128),
- *precision,
- *scale,
- ))
- }
- Some(DataType::Decimal256(precision, scale)) => {
- Some(ScalarValue::Decimal256(
- Some(i256::from(*s.$func())),
- *precision,
- *scale,
- ))
- }
- Some(DataType::Int8) => {
-
Some(ScalarValue::Int8(Some((*s.$func()).try_into().unwrap())))
- }
- Some(DataType::Int16) => {
-
Some(ScalarValue::Int16(Some((*s.$func()).try_into().unwrap())))
- }
- Some(DataType::UInt8) => {
-
Some(ScalarValue::UInt8(Some((*s.$func()).try_into().unwrap())))
- }
- Some(DataType::UInt16) => {
-
Some(ScalarValue::UInt16(Some((*s.$func()).try_into().unwrap())))
- }
- Some(DataType::UInt32) => {
- Some(ScalarValue::UInt32(Some((*s.$func()) as u32)))
- }
- Some(DataType::Date32) => {
- Some(ScalarValue::Date32(Some(*s.$func())))
- }
- Some(DataType::Date64) => {
- Some(ScalarValue::Date64(Some(i64::from(*s.$func()) *
24 * 60 * 60 * 1000)))
- }
- Some(DataType::Time32(TimeUnit::Second)) => {
- Some(ScalarValue::Time32Second(Some((*s.$func()))))
- }
- Some(DataType::Time32(TimeUnit::Millisecond)) => {
-
Some(ScalarValue::Time32Millisecond(Some((*s.$func()))))
- }
- _ => Some(ScalarValue::Int32(Some(*s.$func()))),
- }
+
+ impl<'a, I> $iterator_type<'a, I>
+ where
+ I: Iterator<Item = Option<&'a ParquetStatistics>>,
+ {
+ /// Create a new iterator to extract the statistics
+ fn new(iter: I) -> Self {
+ Self { iter }
}
- ParquetStatistics::Int64(s) => {
- match $target_arrow_type {
- // int64 to decimal with the precision and scale
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(*s.$func() as i128),
- *precision,
- *scale,
- ))
- }
- Some(DataType::Decimal256(precision, scale)) => {
- Some(ScalarValue::Decimal256(
- Some(i256::from(*s.$func())),
- *precision,
- *scale,
- ))
- }
- Some(DataType::UInt64) => {
- Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
- }
- Some(DataType::Time64(TimeUnit::Microsecond)) => {
- Some(ScalarValue::Time64Microsecond(Some((*s.$func()
as i64))))
- }
- Some(DataType::Time64(TimeUnit::Nanosecond)) => {
- Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as
i64))))
+ }
+
+ /// Implement the Iterator trait for the iterator
+ impl<'a, I> Iterator for $iterator_type<'a, I>
+ where
+ I: Iterator<Item = Option<&'a ParquetStatistics>>,
+ {
+ type Item = Option<&'a $stat_value_type>;
+
+ /// return the next statistics value
+ fn next(&mut self) -> Option<Self::Item> {
+ let next = self.iter.next();
+ next.map(|x| {
+ x.and_then(|stats| match stats {
+ $parquet_statistics_type(s) if stats.has_min_max_set()
=> {
+ Some(s.$func())
+ }
+ _ => None,
+ })
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+ }
+ };
+}
+
+make_stats_iterator!(
+ MinBooleanStatsIterator,
+ min,
+ ParquetStatistics::Boolean,
+ bool
+);
+make_stats_iterator!(
+ MaxBooleanStatsIterator,
+ max,
+ ParquetStatistics::Boolean,
+ bool
+);
+make_stats_iterator!(MinInt32StatsIterator, min, ParquetStatistics::Int32,
i32);
+make_stats_iterator!(MaxInt32StatsIterator, max, ParquetStatistics::Int32,
i32);
+make_stats_iterator!(MinInt64StatsIterator, min, ParquetStatistics::Int64,
i64);
+make_stats_iterator!(MaxInt64StatsIterator, max, ParquetStatistics::Int64,
i64);
+make_stats_iterator!(MinFloatStatsIterator, min, ParquetStatistics::Float,
f32);
+make_stats_iterator!(MaxFloatStatsIterator, max, ParquetStatistics::Float,
f32);
+make_stats_iterator!(MinDoubleStatsIterator, min, ParquetStatistics::Double,
f64);
+make_stats_iterator!(MaxDoubleStatsIterator, max, ParquetStatistics::Double,
f64);
+make_stats_iterator!(
+ MinByteArrayStatsIterator,
+ min_bytes,
+ ParquetStatistics::ByteArray,
+ [u8]
+);
+make_stats_iterator!(
+ MaxByteArrayStatsIterator,
+ max_bytes,
+ ParquetStatistics::ByteArray,
+ [u8]
+);
+make_stats_iterator!(
+ MinFixedLenByteArrayStatsIterator,
+ min_bytes,
+ ParquetStatistics::FixedLenByteArray,
+ [u8]
+);
+make_stats_iterator!(
+ MaxFixedLenByteArrayStatsIterator,
+ max_bytes,
+ ParquetStatistics::FixedLenByteArray,
+ [u8]
+);
+
+/// Special iterator adapter for extracting i128 values from from an iterator
of
+/// `ParquetStatistics`
+///
+/// Handles checking if the statistics are present and valid with the correct
type.
+///
+/// Depending on the parquet file, the statistics for `Decimal128` can be
stored as
+/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown:
+///
+/// This iterator handles all cases, extracting the values
+/// and converting it to `stat_value_type`.
+///
+/// Parameters:
+/// * `$iterator_type` is the name of the iterator type (e.g.
`MinBooleanStatsIterator`)
+/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
+/// * `$bytes_func` is the function to call to get the value as bytes (e.g.
`min_bytes` or `max_bytes`)
+/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`)
+/// * `convert_func` is the function to convert the bytes to stats value (e.g.
`from_bytes_to_i128`)
+macro_rules! make_decimal_stats_iterator {
+ ($iterator_type:ident, $func:ident, $bytes_func:ident,
$stat_value_type:ident, $convert_func: ident) => {
+ struct $iterator_type<'a, I>
+ where
+ I: Iterator<Item = Option<&'a ParquetStatistics>>,
+ {
+ iter: I,
+ }
+
+ impl<'a, I> $iterator_type<'a, I>
+ where
+ I: Iterator<Item = Option<&'a ParquetStatistics>>,
+ {
+ fn new(iter: I) -> Self {
+ Self { iter }
+ }
+ }
+
+ impl<'a, I> Iterator for $iterator_type<'a, I>
+ where
+ I: Iterator<Item = Option<&'a ParquetStatistics>>,
+ {
+ type Item = Option<$stat_value_type>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let next = self.iter.next();
+ next.map(|x| {
+ x.and_then(|stats| {
+ if !stats.has_min_max_set() {
+ return None;
+ }
+ match stats {
+ ParquetStatistics::Int32(s) => {
+ Some($stat_value_type::from(*s.$func()))
+ }
+ ParquetStatistics::Int64(s) => {
+ Some($stat_value_type::from(*s.$func()))
+ }
+ ParquetStatistics::ByteArray(s) => {
+ Some($convert_func(s.$bytes_func()))
+ }
+ ParquetStatistics::FixedLenByteArray(s) => {
+ Some($convert_func(s.$bytes_func()))
+ }
+ _ => None,
+ }
+ })
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+ }
+ };
+}
+
+make_decimal_stats_iterator!(
+ MinDecimal128StatsIterator,
+ min,
+ min_bytes,
+ i128,
+ from_bytes_to_i128
+);
+make_decimal_stats_iterator!(
+ MaxDecimal128StatsIterator,
+ max,
+ max_bytes,
+ i128,
+ from_bytes_to_i128
+);
+make_decimal_stats_iterator!(
+ MinDecimal256StatsIterator,
+ min,
+ min_bytes,
+ i256,
+ from_bytes_to_i256
+);
+make_decimal_stats_iterator!(
+ MaxDecimal256StatsIterator,
+ max,
+ max_bytes,
+ i256,
+ from_bytes_to_i256
+);
+
+/// Special macro to combine the statistics iterators for min and max using
the [`mod@paste`] macro.
+/// This is used to avoid repeating the same code for min and max statistics
extractions
+///
+/// Parameters:
+/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min`
or `Max`)
+/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
+/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics
from.
+macro_rules! get_statistics {
+ ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
+ paste! {
+ match $data_type {
+ DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
+ [<$stat_type_prefix
BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
+ ))),
+ DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| {
+ x.and_then(|x| {
+ if let Ok(v) = i8::try_from(*x) {
+ Some(v)
+ } else {
+ None
+ }
+ })
+ }),
+ ))),
+ DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| {
+ x.and_then(|x| {
+ if let Ok(v) = i16::try_from(*x) {
+ Some(v)
+ } else {
+ None
+ }
+ })
+ }),
+ ))),
+ DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+ ))),
+ DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
+ [<$stat_type_prefix
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
+ ))),
+ DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| {
+ x.and_then(|x| {
+ if let Ok(v) = u8::try_from(*x) {
+ Some(v)
+ } else {
+ None
+ }
+ })
+ }),
+ ))),
+ DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| {
+ x.and_then(|x| {
+ if let Ok(v) = u16::try_from(*x) {
+ Some(v)
+ } else {
+ None
+ }
+ })
+ }),
+ ))),
+ DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
+ ))),
+ DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
+ [<$stat_type_prefix
Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
+ ))),
+ DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
+ [<$stat_type_prefix
FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
+ from_bytes_to_f16(x)
+ })),
+ ))),
+ DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
+ [<$stat_type_prefix
FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
+ ))),
+ DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
+ [<$stat_type_prefix
DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
+ ))),
+ DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+ ))),
+ DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
+ [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
+ .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
+ ))),
+ DataType::Timestamp(unit, timezone) =>{
+ let iter = [<$stat_type_prefix
Int64StatsIterator>]::new($iterator).map(|x| x.copied());
+
+ Ok(match unit {
+ TimeUnit::Second => {
+ Arc::new(match timezone {
+ Some(tz) =>
TimestampSecondArray::from_iter(iter).with_timezone(tz.clone()),
+ None => TimestampSecondArray::from_iter(iter),
+ })
}
- Some(DataType::Timestamp(unit, timezone)) => {
- Some(match unit {
- TimeUnit::Second => ScalarValue::TimestampSecond(
- Some(*s.$func()),
- timezone.clone(),
- ),
- TimeUnit::Millisecond =>
ScalarValue::TimestampMillisecond(
- Some(*s.$func()),
- timezone.clone(),
- ),
- TimeUnit::Microsecond =>
ScalarValue::TimestampMicrosecond(
- Some(*s.$func()),
- timezone.clone(),
- ),
- TimeUnit::Nanosecond =>
ScalarValue::TimestampNanosecond(
- Some(*s.$func()),
- timezone.clone(),
- ),
+ TimeUnit::Millisecond => {
+ Arc::new(match timezone {
+ Some(tz) =>
TimestampMillisecondArray::from_iter(iter).with_timezone(tz.clone()),
+ None => TimestampMillisecondArray::from_iter(iter),
})
}
- _ => Some(ScalarValue::Int64(Some(*s.$func()))),
- }
- }
- // 96 bit ints not supported
- ParquetStatistics::Int96(_) => None,
- ParquetStatistics::Float(s) =>
Some(ScalarValue::Float32(Some(*s.$func()))),
- ParquetStatistics::Double(s) =>
Some(ScalarValue::Float64(Some(*s.$func()))),
- ParquetStatistics::ByteArray(s) => {
- match $target_arrow_type {
- // decimal data type
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(from_bytes_to_i128(s.$bytes_func())),
- *precision,
- *scale,
- ))
+ TimeUnit::Microsecond => {
+ Arc::new(match timezone {
+ Some(tz) =>
TimestampMicrosecondArray::from_iter(iter).with_timezone(tz.clone()),
+ None => TimestampMicrosecondArray::from_iter(iter),
+ })
}
- Some(DataType::Decimal256(precision, scale)) => {
- Some(ScalarValue::Decimal256(
- Some(from_bytes_to_i256(s.$bytes_func())),
- *precision,
- *scale,
- ))
+ TimeUnit::Nanosecond => {
+ Arc::new(match timezone {
+ Some(tz) =>
TimestampNanosecondArray::from_iter(iter).with_timezone(tz.clone()),
+ None => TimestampNanosecondArray::from_iter(iter),
+ })
}
- Some(DataType::Binary) => {
-
Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec())))
+ })
+ },
+ DataType::Time32(unit) => {
+ Ok(match unit {
+ TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+ )),
+ TimeUnit::Millisecond =>
Arc::new(Time32MillisecondArray::from_iter(
+ [<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+ )),
+ _ => {
+ let len = $iterator.count();
+ // don't know how to extract statistics, so return a
null array
+ new_null_array($data_type, len)
}
- Some(DataType::LargeBinary) => {
-
Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec())))
+ })
+ },
+ DataType::Time64(unit) => {
+ Ok(match unit {
+ TimeUnit::Microsecond =>
Arc::new(Time64MicrosecondArray::from_iter(
+ [<$stat_type_prefix
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
+ )),
+ TimeUnit::Nanosecond =>
Arc::new(Time64NanosecondArray::from_iter(
+ [<$stat_type_prefix
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
+ )),
+ _ => {
+ let len = $iterator.count();
+ // don't know how to extract statistics, so return a
null array
+ new_null_array($data_type, len)
}
- Some(DataType::LargeUtf8) | _ => {
- let utf8_value = std::str::from_utf8(s.$bytes_func())
- .map(|s| s.to_string())
- .ok();
- if utf8_value.is_none() {
+ })
+ },
+ DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
+ [<$stat_type_prefix
ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x| x.to_vec())),
+ ))),
+ DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
+ [<$stat_type_prefix
ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
+ ))),
+ DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
+ [<$stat_type_prefix
ByteArrayStatsIterator>]::new($iterator).map(|x| {
+ x.and_then(|x| {
+ let res = std::str::from_utf8(x).map(|s|
s.to_string()).ok();
+ if res.is_none() {
log::debug!("Utf8 statistics is a non-UTF8 value,
ignoring it.");
}
-
- match $target_arrow_type {
- Some(DataType::LargeUtf8) =>
Some(ScalarValue::LargeUtf8(utf8_value)),
- _ => Some(ScalarValue::Utf8(utf8_value)),
- }
- }
- }
+ res
+ })
+ }),
+ ))),
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(LargeStringArray::from_iter(
+ [<$stat_type_prefix
ByteArrayStatsIterator>]::new($iterator).map(|x| {
+ x.and_then(|x| {
+ let res = std::str::from_utf8(x).map(|s|
s.to_string()).ok();
+ if res.is_none() {
+ log::debug!("LargeUtf8 statistics is a
non-UTF8 value, ignoring it.");
+ }
+ res
+ })
+ }),
+ )))
}
- // type not fully supported yet
- ParquetStatistics::FixedLenByteArray(s) => {
- match $target_arrow_type {
- // just support specific logical data types, there are
others each
- // with their own ordering
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(from_bytes_to_i128(s.$bytes_func())),
- *precision,
- *scale,
- ))
- }
- Some(DataType::Decimal256(precision, scale)) => {
- Some(ScalarValue::Decimal256(
- Some(from_bytes_to_i256(s.$bytes_func())),
- *precision,
- *scale,
- ))
- }
- Some(DataType::FixedSizeBinary(size)) => {
- let value = s.$bytes_func().to_vec();
- let value = if value.len().try_into() == Ok(*size) {
- Some(value)
+ DataType::FixedSizeBinary(size) =>
Ok(Arc::new(FixedSizeBinaryArray::from(
+ [<$stat_type_prefix
FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
+ x.and_then(|x| {
+ if x.len().try_into() == Ok(*size) {
+ Some(x)
} else {
log::debug!(
"FixedSizeBinary({}) statistics is a binary of
size {}, ignoring it.",
size,
- value.len(),
+ x.len(),
);
None
- };
- Some(ScalarValue::FixedSizeBinary(
- *size,
- value,
- ))
- }
- Some(DataType::Float16) => {
-
Some(ScalarValue::Float16(from_bytes_to_f16(s.$bytes_func())))
- }
- _ => None,
- }
+ }
+ })
+ }).collect::<Vec<_>>(),
+ ))),
+ DataType::Decimal128(precision, scale) => {
+ let arr = Decimal128Array::from_iter(
+ [<$stat_type_prefix
Decimal128StatsIterator>]::new($iterator)
+ ).with_precision_and_scale(*precision, *scale)?;
+ Ok(Arc::new(arr))
+ },
+ DataType::Decimal256(precision, scale) => {
+ let arr = Decimal256Array::from_iter(
+ [<$stat_type_prefix
Decimal256StatsIterator>]::new($iterator)
+ ).with_precision_and_scale(*precision, *scale)?;
+ Ok(Arc::new(arr))
+ },
+ DataType::Dictionary(_, value_type) => {
+ [<$stat_type_prefix:lower _ statistics>](value_type, $iterator)
}
- }
- }};
+
+ DataType::Map(_,_) |
+ DataType::Duration(_) |
+ DataType::Interval(_) |
+ DataType::Null |
+ DataType::BinaryView |
+ DataType::Utf8View |
+ DataType::List(_) |
+ DataType::ListView(_) |
+ DataType::FixedSizeList(_, _) |
+ DataType::LargeList(_) |
+ DataType::LargeListView(_) |
+ DataType::Struct(_) |
+ DataType::Union(_, _) |
+ DataType::RunEndEncoded(_, _) => {
+ let len = $iterator.count();
+ // don't know how to extract statistics, so return a null array
+ Ok(new_null_array($data_type, len))
+ }
+ }}}
}
/// Lookups up the parquet column by name
@@ -293,9 +547,7 @@ pub(crate) fn min_statistics<'a, I: Iterator<Item =
Option<&'a ParquetStatistics
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
- let scalars = iterator
- .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes,
Some(data_type))));
- collect_scalars(data_type, scalars)
+ get_statistics!(Min, data_type, iterator)
}
/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to
an [`ArrayRef`]
@@ -303,24 +555,7 @@ pub(crate) fn max_statistics<'a, I: Iterator<Item =
Option<&'a ParquetStatistics
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
- let scalars = iterator
- .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes,
Some(data_type))));
- collect_scalars(data_type, scalars)
-}
-
-/// Builds an array from an iterator of ScalarValue
-fn collect_scalars<I: Iterator<Item = Option<ScalarValue>>>(
- data_type: &DataType,
- iterator: I,
-) -> Result<ArrayRef> {
- let mut scalars = iterator.peekable();
- match scalars.peek().is_none() {
- true => Ok(new_empty_array(data_type)),
- false => {
- let null = ScalarValue::try_from(data_type)?;
- ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(||
null.clone())))
- }
- }
+ get_statistics!(Max, data_type, iterator)
}
/// What type of statistics should be extracted?
@@ -474,11 +709,10 @@ mod test {
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{i256, Date32Type, Date64Type};
use arrow_array::{
- new_null_array, Array, BinaryArray, BooleanArray, Date32Array,
Date64Array,
- Decimal128Array, Decimal256Array, Float32Array, Float64Array,
Int16Array,
- Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch,
StringArray,
- StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
- TimestampNanosecondArray, TimestampSecondArray,
+ new_empty_array, new_null_array, Array, BinaryArray, BooleanArray,
Date32Array,
+ Date64Array, Decimal128Array, Decimal256Array, Float32Array,
Float64Array,
+ Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
RecordBatch,
+ StringArray, StructArray, TimestampNanosecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs
b/datafusion/core/tests/parquet/arrow_statistics.rs
index 19cc4db4d2..2f8fbab647 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -495,7 +495,6 @@ async fn test_timestamp() {
Test {
reader: &reader,
- // mins are [1577840461000000000, 1577840471000000000,
1577841061000000000, 1578704461000000000,]
expected_min: Arc::new(TimestampNanosecondArray::from(vec![
TimestampNanosecondType::parse("2020-01-01T01:01:01"),
TimestampNanosecondType::parse("2020-01-01T01:01:11"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]