This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9845e6eb58 Avoid the usage of intermediate ScalarValue to improve 
performance of extracting statistics from parquet files (#10711)
9845e6eb58 is described below

commit 9845e6eb58d285e965fa1b1b8788d6d69f9fc005
Author: Xin Li <[email protected]>
AuthorDate: Wed Jun 5 22:48:43 2024 +0800

    Avoid the usage of intermediate ScalarValue to improve performance of 
extracting statistics from parquet files (#10711)
    
    * Fix incorrect statistics read for unsigned integers columns in parquet
    
    * Staging the change for faster stat
    
    * Improve performance of extracting statistics from parquet files
    
    * Revert "Improve performance of extracting statistics from parquet files"
    
    This reverts commit 2faec57e0fc89bf6d90c069a54daa7c991f43d09.
    
    * Revert "Staging the change for faster stat"
    
    This reverts commit 095ac391e17f9d4c3d841971316fb47dc371b791.
    
    * Refine using the iterator idea
    
    * Add the rest types
    
    * Consolidate Decimal statistics extraction
    
    * clippy
    
    * Simplify
    
    * Fix dictionary type
    
    * Fix incorrect statistics read for timestamp columns in parquet
    
    * Add exhaustive match
    
    * Update latest datatypes
    
    * fix bad comment
    
    * Remove duplications using paste
    
    * Fix comment
    
    * Update Cargo.lock
    
    * fix docs
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-cli/Cargo.lock                          |  37 +-
 datafusion/core/Cargo.toml                         |   1 +
 .../datasource/physical_plan/parquet/statistics.rs | 646 ++++++++++++++-------
 datafusion/core/tests/parquet/arrow_statistics.rs  |   1 -
 4 files changed, 463 insertions(+), 222 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 3040586501..b165070c60 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -942,7 +942,7 @@ version = "3.2.25"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008"
 dependencies = [
- "heck",
+ "heck 0.4.1",
  "proc-macro-error",
  "proc-macro2",
  "quote",
@@ -976,7 +976,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
 dependencies = [
  "strum 0.26.2",
- "strum_macros 0.26.2",
+ "strum_macros 0.26.4",
  "unicode-width",
 ]
 
@@ -1156,6 +1156,7 @@ dependencies = [
  "object_store",
  "parking_lot",
  "parquet",
+ "paste",
  "pin-project-lite",
  "rand",
  "sqlparser",
@@ -1255,7 +1256,7 @@ dependencies = [
  "serde_json",
  "sqlparser",
  "strum 0.26.2",
- "strum_macros 0.26.2",
+ "strum_macros 0.26.4",
 ]
 
 [[package]]
@@ -1809,6 +1810,12 @@ version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
 
+[[package]]
+name = "heck"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
+
 [[package]]
 name = "hermit-abi"
 version = "0.1.19"
@@ -1881,9 +1888,9 @@ checksum = 
"9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
 
 [[package]]
 name = "hyper"
-version = "0.14.28"
+version = "0.14.29"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80"
+checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33"
 dependencies = [
  "bytes",
  "futures-channel",
@@ -2683,9 +2690,9 @@ dependencies = [
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.84"
+version = "1.0.85"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ec96c6a92621310b51366f1e28d05ef11489516e93be030060e5fc12024a49d6"
+checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23"
 dependencies = [
  "unicode-ident",
 ]
@@ -3218,7 +3225,7 @@ version = "0.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf"
 dependencies = [
- "heck",
+ "heck 0.4.1",
  "proc-macro2",
  "quote",
  "syn 1.0.109",
@@ -3303,7 +3310,7 @@ version = "0.26.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29"
 dependencies = [
- "strum_macros 0.26.2",
+ "strum_macros 0.26.4",
 ]
 
 [[package]]
@@ -3312,7 +3319,7 @@ version = "0.25.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0"
 dependencies = [
- "heck",
+ "heck 0.4.1",
  "proc-macro2",
  "quote",
  "rustversion",
@@ -3321,11 +3328,11 @@ dependencies = [
 
 [[package]]
 name = "strum_macros"
-version = "0.26.2"
+version = "0.26.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946"
+checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
 dependencies = [
- "heck",
+ "heck 0.5.0",
  "proc-macro2",
  "quote",
  "rustversion",
@@ -3711,9 +3718,9 @@ checksum = 
"d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
 
 [[package]]
 name = "unicode-width"
-version = "0.1.12"
+version = "0.1.13"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6"
+checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
 
 [[package]]
 name = "untrusted"
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 9f1f748435..3946758ff9 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -121,6 +121,7 @@ num_cpus = { workspace = true }
 object_store = { workspace = true }
 parking_lot = { workspace = true }
 parquet = { workspace = true, optional = true, default-features = true }
+paste = "1.0.15"
 pin-project-lite = "^0.2.7"
 rand = { workspace = true }
 sqlparser = { workspace = true }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 8d0d30bf41..a73538d02a 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -19,17 +19,26 @@
 
 // TODO: potentially move this to arrow-rs: 
https://github.com/apache/arrow-rs/issues/4328
 
-use arrow::{array::ArrayRef, datatypes::i256, datatypes::DataType, 
datatypes::TimeUnit};
-use arrow_array::{new_empty_array, new_null_array, UInt64Array};
-use arrow_schema::{Field, FieldRef, Schema};
-use datafusion_common::{
-    internal_datafusion_err, internal_err, plan_err, Result, ScalarValue,
+use arrow::datatypes::i256;
+use arrow::{array::ArrayRef, datatypes::DataType};
+use arrow_array::{
+    new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array, 
Decimal128Array,
+    Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, 
Float64Array,
+    Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, 
LargeStringArray,
+    StringArray, Time32MillisecondArray, Time32SecondArray, 
Time64MicrosecondArray,
+    Time64NanosecondArray, TimestampMicrosecondArray, 
TimestampMillisecondArray,
+    TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
+    UInt64Array, UInt8Array,
 };
+use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
+use datafusion_common::{internal_datafusion_err, internal_err, plan_err, 
Result};
 use half::f16;
 use parquet::file::metadata::ParquetMetaData;
 use parquet::file::statistics::Statistics as ParquetStatistics;
 use parquet::schema::types::SchemaDescriptor;
+use paste::paste;
 use std::sync::Arc;
+
 // Convert the bytes array to i128.
 // The endian of the input bytes array must be big-endian.
 pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
@@ -66,201 +75,446 @@ pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; 
N] {
     result
 }
 
-/// Extract a single min/max statistics from a [`ParquetStatistics`] object
+/// Define an adapter iterator for extracting statistics from an iterator of
+/// `ParquetStatistics`
+///
+///
+/// Handles checking if the statistics are present and valid with the correct 
type.
 ///
-/// * `$column_statistics` is the `ParquetStatistics` object
-/// * `$func is the function` (`min`/`max`) to call to get the value
-/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get 
the value as bytes
-/// * `$target_arrow_type` is the [`DataType`] of the target statistics
-macro_rules! get_statistic {
-    ($column_statistics:expr, $func:ident, $bytes_func:ident, 
$target_arrow_type:expr) => {{
-        if !$column_statistics.has_min_max_set() {
-            return None;
+/// Parameters:
+/// * `$iterator_type` is the name of the iterator type (e.g. 
`MinBooleanStatsIterator`)
+/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
+/// * `$parquet_statistics_type` is the type of the statistics (e.g. 
`ParquetStatistics::Boolean`)
+/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
+macro_rules! make_stats_iterator {
+    ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, 
$stat_value_type:ty) => {
+        /// Maps an iterator of `ParquetStatistics` into an iterator of
+        /// `&$stat_value_type``
+        ///
+        /// Yielded elements:
+        /// * Some(stats) if valid
+        /// * None if the statistics are not present, not valid, or not 
$stat_value_type
+        struct $iterator_type<'a, I>
+        where
+            I: Iterator<Item = Option<&'a ParquetStatistics>>,
+        {
+            iter: I,
         }
-        match $column_statistics {
-            ParquetStatistics::Boolean(s) => 
Some(ScalarValue::Boolean(Some(*s.$func()))),
-            ParquetStatistics::Int32(s) => {
-                match $target_arrow_type {
-                    // int32 to decimal with the precision and scale
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(*s.$func() as i128),
-                            *precision,
-                            *scale,
-                        ))
-                    }
-                    Some(DataType::Decimal256(precision, scale)) => {
-                        Some(ScalarValue::Decimal256(
-                            Some(i256::from(*s.$func())),
-                            *precision,
-                            *scale,
-                        ))
-                    }
-                    Some(DataType::Int8) => {
-                        
Some(ScalarValue::Int8(Some((*s.$func()).try_into().unwrap())))
-                    }
-                    Some(DataType::Int16) => {
-                        
Some(ScalarValue::Int16(Some((*s.$func()).try_into().unwrap())))
-                    }
-                    Some(DataType::UInt8) => {
-                        
Some(ScalarValue::UInt8(Some((*s.$func()).try_into().unwrap())))
-                    }
-                    Some(DataType::UInt16) => {
-                        
Some(ScalarValue::UInt16(Some((*s.$func()).try_into().unwrap())))
-                    }
-                    Some(DataType::UInt32) => {
-                        Some(ScalarValue::UInt32(Some((*s.$func()) as u32)))
-                    }
-                    Some(DataType::Date32) => {
-                        Some(ScalarValue::Date32(Some(*s.$func())))
-                    }
-                    Some(DataType::Date64) => {
-                        Some(ScalarValue::Date64(Some(i64::from(*s.$func()) * 
24 * 60 * 60 * 1000)))
-                    }
-                    Some(DataType::Time32(TimeUnit::Second)) => {
-                        Some(ScalarValue::Time32Second(Some((*s.$func()))))
-                    }
-                    Some(DataType::Time32(TimeUnit::Millisecond)) => {
-                        
Some(ScalarValue::Time32Millisecond(Some((*s.$func()))))
-                    }
-                    _ => Some(ScalarValue::Int32(Some(*s.$func()))),
-                }
+
+        impl<'a, I> $iterator_type<'a, I>
+        where
+            I: Iterator<Item = Option<&'a ParquetStatistics>>,
+        {
+            /// Create a new iterator to extract the statistics
+            fn new(iter: I) -> Self {
+                Self { iter }
             }
-            ParquetStatistics::Int64(s) => {
-                match $target_arrow_type {
-                    // int64 to decimal with the precision and scale
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(*s.$func() as i128),
-                            *precision,
-                            *scale,
-                        ))
-                    }
-                    Some(DataType::Decimal256(precision, scale)) => {
-                        Some(ScalarValue::Decimal256(
-                            Some(i256::from(*s.$func())),
-                            *precision,
-                            *scale,
-                        ))
-                    }
-                    Some(DataType::UInt64) => {
-                        Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
-                    }
-                    Some(DataType::Time64(TimeUnit::Microsecond)) => {
-                        Some(ScalarValue::Time64Microsecond(Some((*s.$func() 
as i64))))
-                    }
-                    Some(DataType::Time64(TimeUnit::Nanosecond)) => {
-                        Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as 
i64))))
+        }
+
+        /// Implement the Iterator trait for the iterator
+        impl<'a, I> Iterator for $iterator_type<'a, I>
+        where
+            I: Iterator<Item = Option<&'a ParquetStatistics>>,
+        {
+            type Item = Option<&'a $stat_value_type>;
+
+            /// return the next statistics value
+            fn next(&mut self) -> Option<Self::Item> {
+                let next = self.iter.next();
+                next.map(|x| {
+                    x.and_then(|stats| match stats {
+                        $parquet_statistics_type(s) if stats.has_min_max_set() 
=> {
+                            Some(s.$func())
+                        }
+                        _ => None,
+                    })
+                })
+            }
+
+            fn size_hint(&self) -> (usize, Option<usize>) {
+                self.iter.size_hint()
+            }
+        }
+    };
+}
+
+make_stats_iterator!(
+    MinBooleanStatsIterator,
+    min,
+    ParquetStatistics::Boolean,
+    bool
+);
+make_stats_iterator!(
+    MaxBooleanStatsIterator,
+    max,
+    ParquetStatistics::Boolean,
+    bool
+);
+make_stats_iterator!(MinInt32StatsIterator, min, ParquetStatistics::Int32, 
i32);
+make_stats_iterator!(MaxInt32StatsIterator, max, ParquetStatistics::Int32, 
i32);
+make_stats_iterator!(MinInt64StatsIterator, min, ParquetStatistics::Int64, 
i64);
+make_stats_iterator!(MaxInt64StatsIterator, max, ParquetStatistics::Int64, 
i64);
+make_stats_iterator!(MinFloatStatsIterator, min, ParquetStatistics::Float, 
f32);
+make_stats_iterator!(MaxFloatStatsIterator, max, ParquetStatistics::Float, 
f32);
+make_stats_iterator!(MinDoubleStatsIterator, min, ParquetStatistics::Double, 
f64);
+make_stats_iterator!(MaxDoubleStatsIterator, max, ParquetStatistics::Double, 
f64);
+make_stats_iterator!(
+    MinByteArrayStatsIterator,
+    min_bytes,
+    ParquetStatistics::ByteArray,
+    [u8]
+);
+make_stats_iterator!(
+    MaxByteArrayStatsIterator,
+    max_bytes,
+    ParquetStatistics::ByteArray,
+    [u8]
+);
+make_stats_iterator!(
+    MinFixedLenByteArrayStatsIterator,
+    min_bytes,
+    ParquetStatistics::FixedLenByteArray,
+    [u8]
+);
+make_stats_iterator!(
+    MaxFixedLenByteArrayStatsIterator,
+    max_bytes,
+    ParquetStatistics::FixedLenByteArray,
+    [u8]
+);
+
+/// Special iterator adapter for extracting i128 values from from an iterator 
of
+/// `ParquetStatistics`
+///
+/// Handles checking if the statistics are present and valid with the correct 
type.
+///
+/// Depending on the parquet file, the statistics for `Decimal128` can be 
stored as
+/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown:
+///
+/// This iterator handles all cases, extracting the values
+/// and converting it to `stat_value_type`.
+///
+/// Parameters:
+/// * `$iterator_type` is the name of the iterator type (e.g. 
`MinBooleanStatsIterator`)
+/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
+/// * `$bytes_func` is the function to call to get the value as bytes (e.g. 
`min_bytes` or `max_bytes`)
+/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`)
+/// * `convert_func` is the function to convert the bytes to stats value (e.g. 
`from_bytes_to_i128`)
+macro_rules! make_decimal_stats_iterator {
+    ($iterator_type:ident, $func:ident, $bytes_func:ident, 
$stat_value_type:ident, $convert_func: ident) => {
+        struct $iterator_type<'a, I>
+        where
+            I: Iterator<Item = Option<&'a ParquetStatistics>>,
+        {
+            iter: I,
+        }
+
+        impl<'a, I> $iterator_type<'a, I>
+        where
+            I: Iterator<Item = Option<&'a ParquetStatistics>>,
+        {
+            fn new(iter: I) -> Self {
+                Self { iter }
+            }
+        }
+
+        impl<'a, I> Iterator for $iterator_type<'a, I>
+        where
+            I: Iterator<Item = Option<&'a ParquetStatistics>>,
+        {
+            type Item = Option<$stat_value_type>;
+
+            fn next(&mut self) -> Option<Self::Item> {
+                let next = self.iter.next();
+                next.map(|x| {
+                    x.and_then(|stats| {
+                        if !stats.has_min_max_set() {
+                            return None;
+                        }
+                        match stats {
+                            ParquetStatistics::Int32(s) => {
+                                Some($stat_value_type::from(*s.$func()))
+                            }
+                            ParquetStatistics::Int64(s) => {
+                                Some($stat_value_type::from(*s.$func()))
+                            }
+                            ParquetStatistics::ByteArray(s) => {
+                                Some($convert_func(s.$bytes_func()))
+                            }
+                            ParquetStatistics::FixedLenByteArray(s) => {
+                                Some($convert_func(s.$bytes_func()))
+                            }
+                            _ => None,
+                        }
+                    })
+                })
+            }
+
+            fn size_hint(&self) -> (usize, Option<usize>) {
+                self.iter.size_hint()
+            }
+        }
+    };
+}
+
+make_decimal_stats_iterator!(
+    MinDecimal128StatsIterator,
+    min,
+    min_bytes,
+    i128,
+    from_bytes_to_i128
+);
+make_decimal_stats_iterator!(
+    MaxDecimal128StatsIterator,
+    max,
+    max_bytes,
+    i128,
+    from_bytes_to_i128
+);
+make_decimal_stats_iterator!(
+    MinDecimal256StatsIterator,
+    min,
+    min_bytes,
+    i256,
+    from_bytes_to_i256
+);
+make_decimal_stats_iterator!(
+    MaxDecimal256StatsIterator,
+    max,
+    max_bytes,
+    i256,
+    from_bytes_to_i256
+);
+
+/// Special macro to combine the statistics iterators for min and max using 
the [`mod@paste`] macro.
+/// This is used to avoid repeating the same code for min and max statistics 
extractions
+///
+/// Parameters:
+/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min` 
or `Max`)
+/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
+/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics 
from.
+macro_rules! get_statistics {
+    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
+        paste! {
+        match $data_type {
+            DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
+                [<$stat_type_prefix 
BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
+            ))),
+            DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
+                [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| {
+                    x.and_then(|x| {
+                        if let Ok(v) = i8::try_from(*x) {
+                            Some(v)
+                        } else {
+                            None
+                        }
+                    })
+                }),
+            ))),
+            DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
+                [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| {
+                    x.and_then(|x| {
+                        if let Ok(v) = i16::try_from(*x) {
+                            Some(v)
+                        } else {
+                            None
+                        }
+                    })
+                }),
+            ))),
+            DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
+                [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+            ))),
+            DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
+                [<$stat_type_prefix 
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
+            ))),
+            DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
+                [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| {
+                    x.and_then(|x| {
+                        if let Ok(v) = u8::try_from(*x) {
+                            Some(v)
+                        } else {
+                            None
+                        }
+                    })
+                }),
+            ))),
+            DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
+                [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| {
+                    x.and_then(|x| {
+                        if let Ok(v) = u16::try_from(*x) {
+                            Some(v)
+                        } else {
+                            None
+                        }
+                    })
+                }),
+            ))),
+            DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
+                [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
+            ))),
+            DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
+                [<$stat_type_prefix 
Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
+            ))),
+            DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
+                [<$stat_type_prefix 
FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
+                    from_bytes_to_f16(x)
+                })),
+            ))),
+            DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
+                [<$stat_type_prefix 
FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
+            ))),
+            DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
+                [<$stat_type_prefix 
DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
+            ))),
+            DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
+                [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+            ))),
+            DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
+                [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
+                    .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
+            ))),
+            DataType::Timestamp(unit, timezone) =>{
+                let iter = [<$stat_type_prefix 
Int64StatsIterator>]::new($iterator).map(|x| x.copied());
+
+                Ok(match unit {
+                    TimeUnit::Second => {
+                        Arc::new(match timezone {
+                            Some(tz) => 
TimestampSecondArray::from_iter(iter).with_timezone(tz.clone()),
+                            None => TimestampSecondArray::from_iter(iter),
+                        })
                     }
-                    Some(DataType::Timestamp(unit, timezone)) => {
-                        Some(match unit {
-                            TimeUnit::Second => ScalarValue::TimestampSecond(
-                                Some(*s.$func()),
-                                timezone.clone(),
-                            ),
-                            TimeUnit::Millisecond => 
ScalarValue::TimestampMillisecond(
-                                Some(*s.$func()),
-                                timezone.clone(),
-                            ),
-                            TimeUnit::Microsecond => 
ScalarValue::TimestampMicrosecond(
-                                Some(*s.$func()),
-                                timezone.clone(),
-                            ),
-                            TimeUnit::Nanosecond => 
ScalarValue::TimestampNanosecond(
-                                Some(*s.$func()),
-                                timezone.clone(),
-                            ),
+                    TimeUnit::Millisecond => {
+                        Arc::new(match timezone {
+                            Some(tz) => 
TimestampMillisecondArray::from_iter(iter).with_timezone(tz.clone()),
+                            None => TimestampMillisecondArray::from_iter(iter),
                         })
                     }
-                    _ => Some(ScalarValue::Int64(Some(*s.$func()))),
-                }
-            }
-            // 96 bit ints not supported
-            ParquetStatistics::Int96(_) => None,
-            ParquetStatistics::Float(s) => 
Some(ScalarValue::Float32(Some(*s.$func()))),
-            ParquetStatistics::Double(s) => 
Some(ScalarValue::Float64(Some(*s.$func()))),
-            ParquetStatistics::ByteArray(s) => {
-                match $target_arrow_type {
-                    // decimal data type
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(from_bytes_to_i128(s.$bytes_func())),
-                            *precision,
-                            *scale,
-                        ))
+                    TimeUnit::Microsecond => {
+                        Arc::new(match timezone {
+                            Some(tz) => 
TimestampMicrosecondArray::from_iter(iter).with_timezone(tz.clone()),
+                            None => TimestampMicrosecondArray::from_iter(iter),
+                        })
                     }
-                    Some(DataType::Decimal256(precision, scale)) => {
-                        Some(ScalarValue::Decimal256(
-                            Some(from_bytes_to_i256(s.$bytes_func())),
-                            *precision,
-                            *scale,
-                        ))
+                    TimeUnit::Nanosecond => {
+                        Arc::new(match timezone {
+                            Some(tz) => 
TimestampNanosecondArray::from_iter(iter).with_timezone(tz.clone()),
+                            None => TimestampNanosecondArray::from_iter(iter),
+                        })
                     }
-                    Some(DataType::Binary) => {
-                        
Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec())))
+                })
+            },
+            DataType::Time32(unit) => {
+                Ok(match unit {
+                    TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
+                        [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+                    )),
+                    TimeUnit::Millisecond => 
Arc::new(Time32MillisecondArray::from_iter(
+                        [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
+                    )),
+                    _ => {
+                        let len = $iterator.count();
+                        // don't know how to extract statistics, so return a 
null array
+                        new_null_array($data_type, len)
                     }
-                    Some(DataType::LargeBinary) => {
-                        
Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec())))
+                })
+            },
+            DataType::Time64(unit) => {
+                Ok(match unit {
+                    TimeUnit::Microsecond =>  
Arc::new(Time64MicrosecondArray::from_iter(
+                        [<$stat_type_prefix 
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
+                    )),
+                    TimeUnit::Nanosecond => 
Arc::new(Time64NanosecondArray::from_iter(
+                        [<$stat_type_prefix 
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
+                    )),
+                    _ => {
+                        let len = $iterator.count();
+                        // don't know how to extract statistics, so return a 
null array
+                        new_null_array($data_type, len)
                     }
-                    Some(DataType::LargeUtf8) | _ => {
-                        let utf8_value = std::str::from_utf8(s.$bytes_func())
-                            .map(|s| s.to_string())
-                            .ok();
-                        if utf8_value.is_none() {
+                })
+            },
+            DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
+                [<$stat_type_prefix 
ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x| x.to_vec())),
+            ))),
+            DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
+                [<$stat_type_prefix 
ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
+            ))),
+            DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
+                [<$stat_type_prefix 
ByteArrayStatsIterator>]::new($iterator).map(|x| {
+                    x.and_then(|x| {
+                        let res = std::str::from_utf8(x).map(|s| 
s.to_string()).ok();
+                        if res.is_none() {
                             log::debug!("Utf8 statistics is a non-UTF8 value, 
ignoring it.");
                         }
-
-                        match $target_arrow_type {
-                            Some(DataType::LargeUtf8) => 
Some(ScalarValue::LargeUtf8(utf8_value)),
-                            _ => Some(ScalarValue::Utf8(utf8_value)),
-                        }
-                    }
-                }
+                        res
+                    })
+                }),
+            ))),
+            DataType::LargeUtf8 => {
+                Ok(Arc::new(LargeStringArray::from_iter(
+                    [<$stat_type_prefix 
ByteArrayStatsIterator>]::new($iterator).map(|x| {
+                        x.and_then(|x| {
+                            let res = std::str::from_utf8(x).map(|s| 
s.to_string()).ok();
+                            if res.is_none() {
+                                log::debug!("LargeUtf8 statistics is a 
non-UTF8 value, ignoring it.");
+                            }
+                            res
+                        })
+                    }),
+                )))
             }
-            // type not fully supported yet
-            ParquetStatistics::FixedLenByteArray(s) => {
-                match $target_arrow_type {
-                    // just support specific logical data types, there are 
others each
-                    // with their own ordering
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(from_bytes_to_i128(s.$bytes_func())),
-                            *precision,
-                            *scale,
-                        ))
-                    }
-                    Some(DataType::Decimal256(precision, scale)) => {
-                        Some(ScalarValue::Decimal256(
-                            Some(from_bytes_to_i256(s.$bytes_func())),
-                            *precision,
-                            *scale,
-                        ))
-                    }
-                    Some(DataType::FixedSizeBinary(size)) => {
-                        let value = s.$bytes_func().to_vec();
-                        let value = if value.len().try_into() == Ok(*size) {
-                            Some(value)
+            DataType::FixedSizeBinary(size) => 
Ok(Arc::new(FixedSizeBinaryArray::from(
+                [<$stat_type_prefix 
FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
+                    x.and_then(|x| {
+                        if x.len().try_into() == Ok(*size) {
+                            Some(x)
                         } else {
                             log::debug!(
                                 "FixedSizeBinary({}) statistics is a binary of 
size {}, ignoring it.",
                                 size,
-                                value.len(),
+                                x.len(),
                             );
                             None
-                        };
-                        Some(ScalarValue::FixedSizeBinary(
-                            *size,
-                            value,
-                        ))
-                    }
-                    Some(DataType::Float16) => {
-                        
Some(ScalarValue::Float16(from_bytes_to_f16(s.$bytes_func())))
-                    }
-                    _ => None,
-                }
+                        }
+                    })
+                }).collect::<Vec<_>>(),
+            ))),
+            DataType::Decimal128(precision, scale) => {
+                let arr = Decimal128Array::from_iter(
+                    [<$stat_type_prefix 
Decimal128StatsIterator>]::new($iterator)
+                ).with_precision_and_scale(*precision, *scale)?;
+                Ok(Arc::new(arr))
+            },
+            DataType::Decimal256(precision, scale) => {
+                let arr = Decimal256Array::from_iter(
+                    [<$stat_type_prefix 
Decimal256StatsIterator>]::new($iterator)
+                ).with_precision_and_scale(*precision, *scale)?;
+                Ok(Arc::new(arr))
+            },
+            DataType::Dictionary(_, value_type) => {
+                [<$stat_type_prefix:lower _ statistics>](value_type, $iterator)
             }
-        }
-    }};
+
+            DataType::Map(_,_) |
+            DataType::Duration(_) |
+            DataType::Interval(_) |
+            DataType::Null |
+            DataType::BinaryView |
+            DataType::Utf8View |
+            DataType::List(_) |
+            DataType::ListView(_) |
+            DataType::FixedSizeList(_, _) |
+            DataType::LargeList(_) |
+            DataType::LargeListView(_) |
+            DataType::Struct(_) |
+            DataType::Union(_, _) |
+            DataType::RunEndEncoded(_, _) => {
+                let len = $iterator.count();
+                // don't know how to extract statistics, so return a null array
+                Ok(new_null_array($data_type, len))
+            }
+        }}}
 }
 
 /// Lookups up the parquet column by name
@@ -293,9 +547,7 @@ pub(crate) fn min_statistics<'a, I: Iterator<Item = 
Option<&'a ParquetStatistics
     data_type: &DataType,
     iterator: I,
 ) -> Result<ArrayRef> {
-    let scalars = iterator
-        .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, 
Some(data_type))));
-    collect_scalars(data_type, scalars)
+    get_statistics!(Min, data_type, iterator)
 }
 
 /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to 
an [`ArrayRef`]
@@ -303,24 +555,7 @@ pub(crate) fn max_statistics<'a, I: Iterator<Item = 
Option<&'a ParquetStatistics
     data_type: &DataType,
     iterator: I,
 ) -> Result<ArrayRef> {
-    let scalars = iterator
-        .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes, 
Some(data_type))));
-    collect_scalars(data_type, scalars)
-}
-
-/// Builds an array from an iterator of ScalarValue
-fn collect_scalars<I: Iterator<Item = Option<ScalarValue>>>(
-    data_type: &DataType,
-    iterator: I,
-) -> Result<ArrayRef> {
-    let mut scalars = iterator.peekable();
-    match scalars.peek().is_none() {
-        true => Ok(new_empty_array(data_type)),
-        false => {
-            let null = ScalarValue::try_from(data_type)?;
-            ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(|| 
null.clone())))
-        }
-    }
+    get_statistics!(Max, data_type, iterator)
 }
 
 /// What type of statistics should be extracted?
@@ -474,11 +709,10 @@ mod test {
     use arrow::compute::kernels::cast_utils::Parser;
     use arrow::datatypes::{i256, Date32Type, Date64Type};
     use arrow_array::{
-        new_null_array, Array, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
-        Decimal128Array, Decimal256Array, Float32Array, Float64Array, 
Int16Array,
-        Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, 
StringArray,
-        StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
-        TimestampNanosecondArray, TimestampSecondArray,
+        new_empty_array, new_null_array, Array, BinaryArray, BooleanArray, 
Date32Array,
+        Date64Array, Decimal128Array, Decimal256Array, Float32Array, 
Float64Array,
+        Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, 
RecordBatch,
+        StringArray, StructArray, TimestampNanosecondArray,
     };
     use arrow_schema::{Field, SchemaRef};
     use bytes::Bytes;
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs 
b/datafusion/core/tests/parquet/arrow_statistics.rs
index 19cc4db4d2..2f8fbab647 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -495,7 +495,6 @@ async fn test_timestamp() {
 
     Test {
         reader: &reader,
-        // mins are [1577840461000000000, 1577840471000000000, 
1577841061000000000, 1578704461000000000,]
         expected_min: Arc::new(TimestampNanosecondArray::from(vec![
             TimestampNanosecondType::parse("2020-01-01T01:01:01"),
             TimestampNanosecondType::parse("2020-01-01T01:01:11"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to