This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 06bbe1298f Extract parquet statistics to its own module, add tests
(#8294)
06bbe1298f is described below
commit 06bbe1298fa8aa042b6a6462e55b2890969d884a
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Nov 29 14:10:06 2023 -0500
Extract parquet statistics to its own module, add tests (#8294)
* Extract parquet statistics to its own module, add tests
* Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* rename enum
* Improve API
* Add test for reading struct array statistics
* Add test for column after statistics
* improve tests
* simplify
* clippy
* Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
* Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
* Add test showing incorrect statistics
* Rework statistics
* Fix clippy
* Update documentation and make it clear the statistics are not publically
accessable
* Add link to upstream arrow ticket
---------
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
.../src/datasource/physical_plan/parquet/mod.rs | 24 +-
.../physical_plan/parquet/page_filter.rs | 5 +-
.../datasource/physical_plan/parquet/row_groups.rs | 189 ++---
.../datasource/physical_plan/parquet/statistics.rs | 899 +++++++++++++++++++++
4 files changed, 951 insertions(+), 166 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 731672ceb8..95aae71c77 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -66,6 +66,7 @@ mod metrics;
pub mod page_filter;
mod row_filter;
mod row_groups;
+mod statistics;
pub use metrics::ParquetFileMetrics;
@@ -506,6 +507,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
+ builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
predicate,
@@ -718,28 +720,6 @@ pub async fn plan_to_parquet(
Ok(())
}
-// Copy from the arrow-rs
-//
https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
-// Convert the byte slice to fixed length byte array with the length of 16
-fn sign_extend_be(b: &[u8]) -> [u8; 16] {
- assert!(b.len() <= 16, "Array too large, expected less than 16");
- let is_negative = (b[0] & 128u8) == 128u8;
- let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
- for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
- *d = *s;
- }
- result
-}
-
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
- // The bytes array are from parquet file and must be the big-endian.
- // The endian is defined by parquet format, and the reference document
- //
https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
- i128::from_be_bytes(sign_extend_be(b))
-}
-
// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
diff --git
a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index b5b5f154f7..42bfef3599 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -39,9 +39,8 @@ use parquet::{
};
use std::sync::Arc;
-use crate::datasource::physical_plan::parquet::{
- from_bytes_to_i128, parquet_to_arrow_decimal_type,
-};
+use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
+use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use super::metrics::ParquetFileMetrics;
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 0079368f9c..0ab2046097 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -15,25 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::{
- array::ArrayRef,
- datatypes::{DataType, Schema},
-};
+use arrow::{array::ArrayRef, datatypes::Schema};
+use arrow_schema::FieldRef;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
+use parquet::file::metadata::ColumnChunkMetaData;
+use parquet::schema::types::SchemaDescriptor;
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
bloom_filter::Sbbf,
- file::{metadata::RowGroupMetaData, statistics::Statistics as
ParquetStatistics},
+ file::metadata::RowGroupMetaData,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
-use crate::datasource::{
- listing::FileRange,
- physical_plan::parquet::{from_bytes_to_i128,
parquet_to_arrow_decimal_type},
+use crate::datasource::listing::FileRange;
+use crate::datasource::physical_plan::parquet::statistics::{
+ max_statistics, min_statistics, parquet_column,
};
use crate::logical_expr::Operator;
use crate::physical_expr::expressions as phys_expr;
@@ -51,7 +51,11 @@ use super::ParquetFileMetrics;
///
/// If an index IS present in the returned Vec it means the predicate
/// did not filter out that row group.
+///
+/// Note: This method currently ignores ColumnOrder
+/// <https://github.com/apache/arrow-datafusion/issues/8335>
pub(crate) fn prune_row_groups_by_statistics(
+ parquet_schema: &SchemaDescriptor,
groups: &[RowGroupMetaData],
range: Option<FileRange>,
predicate: Option<&PruningPredicate>,
@@ -74,8 +78,9 @@ pub(crate) fn prune_row_groups_by_statistics(
if let Some(predicate) = predicate {
let pruning_stats = RowGroupPruningStatistics {
+ parquet_schema,
row_group_metadata: metadata,
- parquet_schema: predicate.schema().as_ref(),
+ arrow_schema: predicate.schema().as_ref(),
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
@@ -296,146 +301,33 @@ impl BloomFilterPruningPredicate {
}
}
-/// Wraps parquet statistics in a way
-/// that implements [`PruningStatistics`]
+/// Wraps [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
+///
+/// Note: This should be implemented for an array of [`RowGroupMetaData`]
instead
+/// of per row-group
struct RowGroupPruningStatistics<'a> {
+ parquet_schema: &'a SchemaDescriptor,
row_group_metadata: &'a RowGroupMetaData,
- parquet_schema: &'a Schema,
+ arrow_schema: &'a Schema,
}
-/// Extract the min/max statistics from a `ParquetStatistics` object
-macro_rules! get_statistic {
- ($column_statistics:expr, $func:ident, $bytes_func:ident,
$target_arrow_type:expr) => {{
- if !$column_statistics.has_min_max_set() {
- return None;
- }
- match $column_statistics {
- ParquetStatistics::Boolean(s) =>
Some(ScalarValue::Boolean(Some(*s.$func()))),
- ParquetStatistics::Int32(s) => {
- match $target_arrow_type {
- // int32 to decimal with the precision and scale
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(*s.$func() as i128),
- precision,
- scale,
- ))
- }
- _ => Some(ScalarValue::Int32(Some(*s.$func()))),
- }
- }
- ParquetStatistics::Int64(s) => {
- match $target_arrow_type {
- // int64 to decimal with the precision and scale
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(*s.$func() as i128),
- precision,
- scale,
- ))
- }
- _ => Some(ScalarValue::Int64(Some(*s.$func()))),
- }
- }
- // 96 bit ints not supported
- ParquetStatistics::Int96(_) => None,
- ParquetStatistics::Float(s) =>
Some(ScalarValue::Float32(Some(*s.$func()))),
- ParquetStatistics::Double(s) =>
Some(ScalarValue::Float64(Some(*s.$func()))),
- ParquetStatistics::ByteArray(s) => {
- match $target_arrow_type {
- // decimal data type
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(from_bytes_to_i128(s.$bytes_func())),
- precision,
- scale,
- ))
- }
- _ => {
- let s = std::str::from_utf8(s.$bytes_func())
- .map(|s| s.to_string())
- .ok();
- Some(ScalarValue::Utf8(s))
- }
- }
- }
- // type not supported yet
- ParquetStatistics::FixedLenByteArray(s) => {
- match $target_arrow_type {
- // just support the decimal data type
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(from_bytes_to_i128(s.$bytes_func())),
- precision,
- scale,
- ))
- }
- _ => None,
- }
- }
- }
- }};
-}
-
-// Extract the min or max value calling `func` or `bytes_func` on the
ParquetStatistics as appropriate
-macro_rules! get_min_max_values {
- ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
- let (_column_index, field) =
- if let Some((v, f)) =
$self.parquet_schema.column_with_name(&$column.name) {
- (v, f)
- } else {
- // Named column was not present
- return None;
- };
-
- let data_type = field.data_type();
- // The result may be None, because DataFusion doesn't have support for
ScalarValues of the column type
- let null_scalar: ScalarValue = data_type.try_into().ok()?;
-
- $self.row_group_metadata
- .columns()
- .iter()
- .find(|c| c.column_descr().name() == &$column.name)
- .and_then(|c| if c.statistics().is_some()
{Some((c.statistics().unwrap(), c.column_descr()))} else {None})
- .map(|(stats, column_descr)|
- {
- let target_data_type =
parquet_to_arrow_decimal_type(column_descr);
- get_statistic!(stats, $func, $bytes_func, target_data_type)
- })
- .flatten()
- // column either didn't have statistics at all or didn't have
min/max values
- .or_else(|| Some(null_scalar.clone()))
- .and_then(|s| s.to_array().ok())
- }}
-}
-
-// Extract the null count value on the ParquetStatistics
-macro_rules! get_null_count_values {
- ($self:expr, $column:expr) => {{
- let value = ScalarValue::UInt64(
- if let Some(col) = $self
- .row_group_metadata
- .columns()
- .iter()
- .find(|c| c.column_descr().name() == &$column.name)
- {
- col.statistics().map(|s| s.null_count())
- } else {
- Some($self.row_group_metadata.num_rows() as u64)
- },
- );
-
- value.to_array().ok()
- }};
+impl<'a> RowGroupPruningStatistics<'a> {
+ /// Lookups up the parquet column by name
+ fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> {
+ let (idx, field) = parquet_column(self.parquet_schema,
self.arrow_schema, name)?;
+ Some((self.row_group_metadata.column(idx), field))
+ }
}
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
- get_min_max_values!(self, column, min, min_bytes)
+ let (column, field) = self.column(&column.name)?;
+ min_statistics(field.data_type(),
std::iter::once(column.statistics())).ok()
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
- get_min_max_values!(self, column, max, max_bytes)
+ let (column, field) = self.column(&column.name)?;
+ max_statistics(field.data_type(),
std::iter::once(column.statistics())).ok()
}
fn num_containers(&self) -> usize {
@@ -443,7 +335,9 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
- get_null_count_values!(self, column)
+ let (c, _) = self.column(&column.name)?;
+ let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count()));
+ scalar.to_array().ok()
}
}
@@ -463,6 +357,7 @@ mod tests {
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_sql::planner::ContextProvider;
+ use parquet::arrow::arrow_to_parquet_schema;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::basic::LogicalType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
@@ -540,6 +435,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&[rgm1, rgm2],
None,
Some(&pruning_predicate),
@@ -574,6 +470,7 @@ mod tests {
// is null / undefined so the first row group can't be filtered out
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&[rgm1, rgm2],
None,
Some(&pruning_predicate),
@@ -621,6 +518,7 @@ mod tests {
// when conditions are joined using AND
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
groups,
None,
Some(&pruning_predicate),
@@ -639,6 +537,7 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are
filtered out
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
groups,
None,
Some(&pruning_predicate),
@@ -678,6 +577,7 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
+ let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr,
schema).unwrap();
@@ -687,6 +587,7 @@ mod tests {
// First row group was filtered out because it contains no null value
on "c2".
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&groups,
None,
Some(&pruning_predicate),
@@ -706,6 +607,7 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
+ let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1")
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
@@ -718,6 +620,7 @@ mod tests {
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&groups,
None,
Some(&pruning_predicate),
@@ -776,6 +679,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
@@ -839,6 +743,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&[rgm1, rgm2, rgm3, rgm4],
None,
Some(&pruning_predicate),
@@ -886,6 +791,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
@@ -956,6 +862,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
@@ -1015,6 +922,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema_descr,
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
@@ -1028,7 +936,6 @@ mod tests {
schema_descr: &SchemaDescPtr,
column_statistics: Vec<ParquetStatistics>,
) -> RowGroupMetaData {
- use parquet::file::metadata::ColumnChunkMetaData;
let mut columns = vec![];
for (i, s) in column_statistics.iter().enumerate() {
let column = ColumnChunkMetaData::builder(schema_descr.column(i))
@@ -1046,7 +953,7 @@ mod tests {
}
fn get_test_schema_descr(fields: Vec<PrimitiveTypeField>) -> SchemaDescPtr
{
- use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
+ use parquet::schema::types::Type as SchemaType;
let schema_fields = fields
.iter()
.map(|field| {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
new file mode 100644
index 0000000000..4e472606da
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -0,0 +1,899 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`min_statistics`] and [`max_statistics`] convert statistics in parquet
format to arrow [`ArrayRef`].
+
+// TODO: potentially move this to arrow-rs:
https://github.com/apache/arrow-rs/issues/4328
+
+use arrow::{array::ArrayRef, datatypes::DataType};
+use arrow_array::new_empty_array;
+use arrow_schema::{FieldRef, Schema};
+use datafusion_common::{Result, ScalarValue};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+use parquet::schema::types::SchemaDescriptor;
+
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
+ // The bytes array are from parquet file and must be the big-endian.
+ // The endian is defined by parquet format, and the reference document
+ //
https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+ i128::from_be_bytes(sign_extend_be(b))
+}
+
+// Copy from arrow-rs
+//
https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
+// Convert the byte slice to fixed length byte array with the length of 16
+fn sign_extend_be(b: &[u8]) -> [u8; 16] {
+ assert!(b.len() <= 16, "Array too large, expected less than 16");
+ let is_negative = (b[0] & 128u8) == 128u8;
+ let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
+ for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
+ *d = *s;
+ }
+ result
+}
+
+/// Extract a single min/max statistics from a [`ParquetStatistics`] object
+///
+/// * `$column_statistics` is the `ParquetStatistics` object
+/// * `$func is the function` (`min`/`max`) to call to get the value
+/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get
the value as bytes
+/// * `$target_arrow_type` is the [`DataType`] of the target statistics
+macro_rules! get_statistic {
+ ($column_statistics:expr, $func:ident, $bytes_func:ident,
$target_arrow_type:expr) => {{
+ if !$column_statistics.has_min_max_set() {
+ return None;
+ }
+ match $column_statistics {
+ ParquetStatistics::Boolean(s) =>
Some(ScalarValue::Boolean(Some(*s.$func()))),
+ ParquetStatistics::Int32(s) => {
+ match $target_arrow_type {
+ // int32 to decimal with the precision and scale
+ Some(DataType::Decimal128(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(*s.$func() as i128),
+ *precision,
+ *scale,
+ ))
+ }
+ _ => Some(ScalarValue::Int32(Some(*s.$func()))),
+ }
+ }
+ ParquetStatistics::Int64(s) => {
+ match $target_arrow_type {
+ // int64 to decimal with the precision and scale
+ Some(DataType::Decimal128(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(*s.$func() as i128),
+ *precision,
+ *scale,
+ ))
+ }
+ _ => Some(ScalarValue::Int64(Some(*s.$func()))),
+ }
+ }
+ // 96 bit ints not supported
+ ParquetStatistics::Int96(_) => None,
+ ParquetStatistics::Float(s) =>
Some(ScalarValue::Float32(Some(*s.$func()))),
+ ParquetStatistics::Double(s) =>
Some(ScalarValue::Float64(Some(*s.$func()))),
+ ParquetStatistics::ByteArray(s) => {
+ match $target_arrow_type {
+ // decimal data type
+ Some(DataType::Decimal128(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(from_bytes_to_i128(s.$bytes_func())),
+ *precision,
+ *scale,
+ ))
+ }
+ _ => {
+ let s = std::str::from_utf8(s.$bytes_func())
+ .map(|s| s.to_string())
+ .ok();
+ Some(ScalarValue::Utf8(s))
+ }
+ }
+ }
+ // type not supported yet
+ ParquetStatistics::FixedLenByteArray(s) => {
+ match $target_arrow_type {
+ // just support the decimal data type
+ Some(DataType::Decimal128(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(from_bytes_to_i128(s.$bytes_func())),
+ *precision,
+ *scale,
+ ))
+ }
+ _ => None,
+ }
+ }
+ }
+ }};
+}
+
+/// Lookups up the parquet column by name
+///
+/// Returns the parquet column index and the corresponding arrow field
+pub(crate) fn parquet_column<'a>(
+ parquet_schema: &SchemaDescriptor,
+ arrow_schema: &'a Schema,
+ name: &str,
+) -> Option<(usize, &'a FieldRef)> {
+ let (root_idx, field) = arrow_schema.fields.find(name)?;
+ if field.data_type().is_nested() {
+ // Nested fields are not supported and require non-trivial logic
+ // to correctly walk the parquet schema accounting for the
+ // logical type rules -
<https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
+ //
+ // For example a ListArray could correspond to anything from 1 to 3
levels
+ // in the parquet schema
+ return None;
+ }
+
+ // This could be made more efficient (#TBD)
+ let parquet_idx = (0..parquet_schema.columns().len())
+ .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
+ Some((parquet_idx, field))
+}
+
+/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to
an [`ArrayRef`]
+pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a
ParquetStatistics>>>(
+ data_type: &DataType,
+ iterator: I,
+) -> Result<ArrayRef> {
+ let scalars = iterator
+ .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes,
Some(data_type))));
+ collect_scalars(data_type, scalars)
+}
+
+/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to
an [`ArrayRef`]
+pub(crate) fn max_statistics<'a, I: Iterator<Item = Option<&'a
ParquetStatistics>>>(
+ data_type: &DataType,
+ iterator: I,
+) -> Result<ArrayRef> {
+ let scalars = iterator
+ .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes,
Some(data_type))));
+ collect_scalars(data_type, scalars)
+}
+
+/// Builds an array from an iterator of ScalarValue
+fn collect_scalars<I: Iterator<Item = Option<ScalarValue>>>(
+ data_type: &DataType,
+ iterator: I,
+) -> Result<ArrayRef> {
+ let mut scalars = iterator.peekable();
+ match scalars.peek().is_none() {
+ true => Ok(new_empty_array(data_type)),
+ false => {
+ let null = ScalarValue::try_from(data_type)?;
+ ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(||
null.clone())))
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use arrow_array::{
+ new_null_array, Array, BinaryArray, BooleanArray, Decimal128Array,
Float32Array,
+ Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
StructArray,
+ TimestampNanosecondArray,
+ };
+ use arrow_schema::{Field, SchemaRef};
+ use bytes::Bytes;
+ use datafusion_common::test_util::parquet_test_data;
+ use parquet::arrow::arrow_reader::ArrowReaderBuilder;
+ use parquet::arrow::arrow_writer::ArrowWriter;
+ use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+ use parquet::file::properties::{EnabledStatistics, WriterProperties};
+ use std::path::PathBuf;
+ use std::sync::Arc;
+
+ // TODO error cases (with parquet statistics that are mismatched in
expected type)
+
+ #[test]
+ fn roundtrip_empty() {
+ let empty_bool_array = new_empty_array(&DataType::Boolean);
+ Test {
+ input: empty_bool_array.clone(),
+ expected_min: empty_bool_array.clone(),
+ expected_max: empty_bool_array.clone(),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_bool() {
+ Test {
+ input: bool_array([
+ // row group 1
+ Some(true),
+ None,
+ Some(true),
+ // row group 2
+ Some(true),
+ Some(false),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ]),
+ expected_min: bool_array([Some(true), Some(false), None]),
+ expected_max: bool_array([Some(true), Some(true), None]),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_int32() {
+ Test {
+ input: i32_array([
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(0),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ]),
+ expected_min: i32_array([Some(1), Some(0), None]),
+ expected_max: i32_array([Some(3), Some(5), None]),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_int64() {
+ Test {
+ input: i64_array([
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(0),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ]),
+ expected_min: i64_array([Some(1), Some(0), None]),
+ expected_max: i64_array(vec![Some(3), Some(5), None]),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_f32() {
+ Test {
+ input: f32_array([
+ // row group 1
+ Some(1.0),
+ None,
+ Some(3.0),
+ // row group 2
+ Some(-1.0),
+ Some(5.0),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ]),
+ expected_min: f32_array([Some(1.0), Some(-1.0), None]),
+ expected_max: f32_array([Some(3.0), Some(5.0), None]),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_f64() {
+ Test {
+ input: f64_array([
+ // row group 1
+ Some(1.0),
+ None,
+ Some(3.0),
+ // row group 2
+ Some(-1.0),
+ Some(5.0),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ]),
+ expected_min: f64_array([Some(1.0), Some(-1.0), None]),
+ expected_max: f64_array([Some(3.0), Some(5.0), None]),
+ }
+ .run()
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "Inconsistent types in ScalarValue::iter_to_array. Expected
Int64, got TimestampNanosecond(NULL, None)"
+ )]
+ // Due to https://github.com/apache/arrow-datafusion/issues/8295
+ fn roundtrip_timestamp() {
+ Test {
+ input: timestamp_array([
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ]),
+ expected_min: timestamp_array([Some(1), Some(5), None]),
+ expected_max: timestamp_array([Some(3), Some(9), None]),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_decimal() {
+ Test {
+ input: Arc::new(
+ Decimal128Array::from(vec![
+ // row group 1
+ Some(100),
+ None,
+ Some(22000),
+ // row group 2
+ Some(500000),
+ Some(330000),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ])
+ .with_precision_and_scale(9, 2)
+ .unwrap(),
+ ),
+ expected_min: Arc::new(
+ Decimal128Array::from(vec![Some(100), Some(330000), None])
+ .with_precision_and_scale(9, 2)
+ .unwrap(),
+ ),
+ expected_max: Arc::new(
+ Decimal128Array::from(vec![Some(22000), Some(500000), None])
+ .with_precision_and_scale(9, 2)
+ .unwrap(),
+ ),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_utf8() {
+ Test {
+ input: utf8_array([
+ // row group 1
+ Some("A"),
+ None,
+ Some("Q"),
+ // row group 2
+ Some("ZZ"),
+ Some("AA"),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ]),
+ expected_min: utf8_array([Some("A"), Some("AA"), None]),
+ expected_max: utf8_array([Some("Q"), Some("ZZ"), None]),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_struct() {
+ let mut test = Test {
+ input: struct_array(vec![
+ // row group 1
+ (Some(true), Some(1)),
+ (None, None),
+ (Some(true), Some(3)),
+ // row group 2
+ (Some(true), Some(0)),
+ (Some(false), Some(5)),
+ (None, None),
+ // row group 3
+ (None, None),
+ (None, None),
+ (None, None),
+ ]),
+ expected_min: struct_array(vec![
+ (Some(true), Some(1)),
+ (Some(true), Some(0)),
+ (None, None),
+ ]),
+
+ expected_max: struct_array(vec![
+ (Some(true), Some(3)),
+ (Some(true), Some(0)),
+ (None, None),
+ ]),
+ };
+ // Due to https://github.com/apache/arrow-datafusion/issues/8334,
+ // statistics for struct arrays are not supported
+ test.expected_min =
+ new_null_array(test.input.data_type(), test.expected_min.len());
+ test.expected_max =
+ new_null_array(test.input.data_type(), test.expected_min.len());
+ test.run()
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "Inconsistent types in ScalarValue::iter_to_array. Expected
Utf8, got Binary(NULL)"
+ )]
+ // Due to https://github.com/apache/arrow-datafusion/issues/8295
+ fn roundtrip_binary() {
+ Test {
+ input: Arc::new(BinaryArray::from_opt_vec(vec![
+ // row group 1
+ Some(b"A"),
+ None,
+ Some(b"Q"),
+ // row group 2
+ Some(b"ZZ"),
+ Some(b"AA"),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ])),
+ expected_min: Arc::new(BinaryArray::from_opt_vec(vec![
+ Some(b"A"),
+ Some(b"AA"),
+ None,
+ ])),
+ expected_max: Arc::new(BinaryArray::from_opt_vec(vec![
+ Some(b"Q"),
+ Some(b"ZZ"),
+ None,
+ ])),
+ }
+ .run()
+ }
+
+ #[test]
+ fn struct_and_non_struct() {
+ // Ensures that statistics for an array that appears *after* a struct
+ // array are not wrong
+ let struct_col = struct_array(vec![
+ // row group 1
+ (Some(true), Some(1)),
+ (None, None),
+ (Some(true), Some(3)),
+ ]);
+ let int_col = i32_array([Some(100), Some(200), Some(300)]);
+ let expected_min = i32_array([Some(100)]);
+ let expected_max = i32_array(vec![Some(300)]);
+
+ // use a name that shadows a name in the struct column
+ match struct_col.data_type() {
+ DataType::Struct(fields) => {
+ assert_eq!(fields.get(1).unwrap().name(), "int_col")
+ }
+ _ => panic!("unexpected data type for struct column"),
+ };
+
+ let input_batch = RecordBatch::try_from_iter([
+ ("struct_col", struct_col),
+ ("int_col", int_col),
+ ])
+ .unwrap();
+
+ let schema = input_batch.schema();
+
+ let metadata = parquet_metadata(schema.clone(), input_batch);
+ let parquet_schema = metadata.file_metadata().schema_descr();
+
+ // read the int_col statistics
+ let (idx, _) = parquet_column(parquet_schema, &schema,
"int_col").unwrap();
+ assert_eq!(idx, 2);
+
+ let row_groups = metadata.row_groups();
+ let iter = row_groups.iter().map(|x| x.column(idx).statistics());
+
+ let min = min_statistics(&DataType::Int32, iter.clone()).unwrap();
+ assert_eq!(
+ &min,
+ &expected_min,
+ "Min. Statistics\n\n{}\n\n",
+ DisplayStats(row_groups)
+ );
+
+ let max = max_statistics(&DataType::Int32, iter).unwrap();
+ assert_eq!(
+ &max,
+ &expected_max,
+ "Max. Statistics\n\n{}\n\n",
+ DisplayStats(row_groups)
+ );
+ }
+
+ #[test]
+ fn nan_in_stats() {
+ // /parquet-testing/data/nan_in_stats.parquet
+ // row_groups: 1
+ // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None,
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+
+ TestFile::new("nan_in_stats.parquet")
+ .with_column(ExpectedColumn {
+ name: "x",
+ expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])),
+ expected_max:
Arc::new(Float64Array::from(vec![Some(f64::NAN)])),
+ })
+ .run();
+ }
+
+ #[test]
+ fn alltypes_plain() {
+ // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet
+ // row_groups: 1
+ // (has no statistics)
+ TestFile::new("alltypes_plain.parquet")
+ // No column statistics should be read as NULL, but with the right
type
+ .with_column(ExpectedColumn {
+ name: "id",
+ expected_min: i32_array([None]),
+ expected_max: i32_array([None]),
+ })
+ .with_column(ExpectedColumn {
+ name: "bool_col",
+ expected_min: bool_array([None]),
+ expected_max: bool_array([None]),
+ })
+ .run();
+ }
+
+ #[test]
+ fn alltypes_tiny_pages() {
+ // /parquet-testing/data/alltypes_tiny_pages.parquet
+ // row_groups: 1
+ // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None,
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+ // "bool_col": Boolean({min: Some(false), max: Some(true),
distinct_count: None, null_count: 0, min_max_deprecated: false,
min_max_backwards_compatible: false})
+ // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count:
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible:
false})
+ // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count:
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible:
false})
+ // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None,
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+ // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count:
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible:
false})
+ // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count:
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible:
false})
+ // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999),
distinct_count: None, null_count: 0, min_max_deprecated: false,
min_max_backwards_compatible: false})
+ // "date_string_col": ByteArray({min: Some(ByteArray { data:
"01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None,
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+ // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max:
Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0,
min_max_deprecated: false, min_max_backwards_compatible: false})
+ // "timestamp_col": Int96({min: None, max: None, distinct_count: None,
null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true})
+ // "year": Int32({min: Some(2009), max: Some(2010), distinct_count:
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible:
false})
+ // "month": Int32({min: Some(1), max: Some(12), distinct_count: None,
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+ TestFile::new("alltypes_tiny_pages.parquet")
+ .with_column(ExpectedColumn {
+ name: "id",
+ expected_min: i32_array([Some(0)]),
+ expected_max: i32_array([Some(7299)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "bool_col",
+ expected_min: bool_array([Some(false)]),
+ expected_max: bool_array([Some(true)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "tinyint_col",
+ expected_min: i32_array([Some(0)]),
+ expected_max: i32_array([Some(9)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "smallint_col",
+ expected_min: i32_array([Some(0)]),
+ expected_max: i32_array([Some(9)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "int_col",
+ expected_min: i32_array([Some(0)]),
+ expected_max: i32_array([Some(9)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "bigint_col",
+ expected_min: i64_array([Some(0)]),
+ expected_max: i64_array([Some(90)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "float_col",
+ expected_min: f32_array([Some(0.0)]),
+ expected_max: f32_array([Some(9.9)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "double_col",
+ expected_min: f64_array([Some(0.0)]),
+ expected_max: f64_array([Some(90.89999999999999)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "date_string_col",
+ expected_min: utf8_array([Some("01/01/09")]),
+ expected_max: utf8_array([Some("12/31/10")]),
+ })
+ .with_column(ExpectedColumn {
+ name: "string_col",
+ expected_min: utf8_array([Some("0")]),
+ expected_max: utf8_array([Some("9")]),
+ })
+ // File has no min/max for timestamp_col
+ .with_column(ExpectedColumn {
+ name: "timestamp_col",
+ expected_min: timestamp_array([None]),
+ expected_max: timestamp_array([None]),
+ })
+ .with_column(ExpectedColumn {
+ name: "year",
+ expected_min: i32_array([Some(2009)]),
+ expected_max: i32_array([Some(2010)]),
+ })
+ .with_column(ExpectedColumn {
+ name: "month",
+ expected_min: i32_array([Some(1)]),
+ expected_max: i32_array([Some(12)]),
+ })
+ .run();
+ }
+
+ #[test]
+ fn fixed_length_decimal_legacy() {
+ // /parquet-testing/data/fixed_length_decimal_legacy.parquet
+ // row_groups: 1
+ // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray {
data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max:
Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count:
None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible:
true})
+
+ TestFile::new("fixed_length_decimal_legacy.parquet")
+ .with_column(ExpectedColumn {
+ name: "value",
+ expected_min: Arc::new(
+ Decimal128Array::from(vec![Some(200)])
+ .with_precision_and_scale(13, 2)
+ .unwrap(),
+ ),
+ expected_max: Arc::new(
+ Decimal128Array::from(vec![Some(2400)])
+ .with_precision_and_scale(13, 2)
+ .unwrap(),
+ ),
+ })
+ .run();
+ }
+
+ const ROWS_PER_ROW_GROUP: usize = 3;
+
+ /// Writes the input batch into a parquet file, with every every three
rows as
+ /// their own row group, and compares the min/maxes to the expected values
+ struct Test {
+ input: ArrayRef,
+ expected_min: ArrayRef,
+ expected_max: ArrayRef,
+ }
+
+ impl Test {
+ fn run(self) {
+ let Self {
+ input,
+ expected_min,
+ expected_max,
+ } = self;
+
+ let input_batch = RecordBatch::try_from_iter([("c1",
input)]).unwrap();
+
+ let schema = input_batch.schema();
+
+ let metadata = parquet_metadata(schema.clone(), input_batch);
+ let parquet_schema = metadata.file_metadata().schema_descr();
+
+ let row_groups = metadata.row_groups();
+
+ for field in schema.fields() {
+ if field.data_type().is_nested() {
+ let lookup = parquet_column(parquet_schema, &schema,
field.name());
+ assert_eq!(lookup, None);
+ continue;
+ }
+
+ let (idx, f) =
+ parquet_column(parquet_schema, &schema,
field.name()).unwrap();
+ assert_eq!(f, field);
+
+ let iter = row_groups.iter().map(|x|
x.column(idx).statistics());
+ let min = min_statistics(f.data_type(), iter.clone()).unwrap();
+ assert_eq!(
+ &min,
+ &expected_min,
+ "Min. Statistics\n\n{}\n\n",
+ DisplayStats(row_groups)
+ );
+
+ let max = max_statistics(f.data_type(), iter).unwrap();
+ assert_eq!(
+ &max,
+ &expected_max,
+ "Max. Statistics\n\n{}\n\n",
+ DisplayStats(row_groups)
+ );
+ }
+ }
+ }
+
+ /// Write the specified batches out as parquet and return the metadata
+ fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) ->
Arc<ParquetMetaData> {
+ let props = WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Chunk)
+ .set_max_row_group_size(ROWS_PER_ROW_GROUP)
+ .build();
+
+ let mut buffer = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema,
Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let reader = ArrowReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
+ reader.metadata().clone()
+ }
+
+ /// Formats the statistics nicely for display
+ struct DisplayStats<'a>(&'a [RowGroupMetaData]);
+ impl<'a> std::fmt::Display for DisplayStats<'a> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let row_groups = self.0;
+ writeln!(f, " row_groups: {}", row_groups.len())?;
+ for rg in row_groups {
+ for col in rg.columns() {
+ if let Some(statistics) = col.statistics() {
+ writeln!(f, " {}: {:?}", col.column_path(),
statistics)?;
+ }
+ }
+ }
+ Ok(())
+ }
+ }
+
+ struct ExpectedColumn {
+ name: &'static str,
+ expected_min: ArrayRef,
+ expected_max: ArrayRef,
+ }
+
+ /// Reads statistics out of the specified, and compares them to the
expected values
+ struct TestFile {
+ file_name: &'static str,
+ expected_columns: Vec<ExpectedColumn>,
+ }
+
+ impl TestFile {
+ fn new(file_name: &'static str) -> Self {
+ Self {
+ file_name,
+ expected_columns: Vec::new(),
+ }
+ }
+
+ fn with_column(mut self, column: ExpectedColumn) -> Self {
+ self.expected_columns.push(column);
+ self
+ }
+
+ /// Reads the specified parquet file and validates that the exepcted
min/max
+ /// values for the specified columns are as expected.
+ fn run(self) {
+ let path = PathBuf::from(parquet_test_data()).join(self.file_name);
+ let file = std::fs::File::open(path).unwrap();
+ let reader = ArrowReaderBuilder::try_new(file).unwrap();
+ let arrow_schema = reader.schema();
+ let metadata = reader.metadata();
+ let row_groups = metadata.row_groups();
+ let parquet_schema = metadata.file_metadata().schema_descr();
+
+ for expected_column in self.expected_columns {
+ let ExpectedColumn {
+ name,
+ expected_min,
+ expected_max,
+ } = expected_column;
+
+ let (idx, field) =
+ parquet_column(parquet_schema, arrow_schema,
name).unwrap();
+
+ let iter = row_groups.iter().map(|x|
x.column(idx).statistics());
+ let actual_min = min_statistics(field.data_type(),
iter.clone()).unwrap();
+ assert_eq!(&expected_min, &actual_min, "column {name}");
+
+ let actual_max = max_statistics(field.data_type(),
iter).unwrap();
+ assert_eq!(&expected_max, &actual_max, "column {name}");
+ }
+ }
+ }
+
+ fn bool_array(input: impl IntoIterator<Item = Option<bool>>) -> ArrayRef {
+ let array: BooleanArray = input.into_iter().collect();
+ Arc::new(array)
+ }
+
+ fn i32_array(input: impl IntoIterator<Item = Option<i32>>) -> ArrayRef {
+ let array: Int32Array = input.into_iter().collect();
+ Arc::new(array)
+ }
+
+ fn i64_array(input: impl IntoIterator<Item = Option<i64>>) -> ArrayRef {
+ let array: Int64Array = input.into_iter().collect();
+ Arc::new(array)
+ }
+
+ fn f32_array(input: impl IntoIterator<Item = Option<f32>>) -> ArrayRef {
+ let array: Float32Array = input.into_iter().collect();
+ Arc::new(array)
+ }
+
+ fn f64_array(input: impl IntoIterator<Item = Option<f64>>) -> ArrayRef {
+ let array: Float64Array = input.into_iter().collect();
+ Arc::new(array)
+ }
+
+ fn timestamp_array(input: impl IntoIterator<Item = Option<i64>>) ->
ArrayRef {
+ let array: TimestampNanosecondArray = input.into_iter().collect();
+ Arc::new(array)
+ }
+
+ fn utf8_array<'a>(input: impl IntoIterator<Item = Option<&'a str>>) ->
ArrayRef {
+ let array: StringArray = input
+ .into_iter()
+ .map(|s| s.map(|s| s.to_string()))
+ .collect();
+ Arc::new(array)
+ }
+
+ // returns a struct array with columns "bool_col" and "int_col" with the
specified values
+ fn struct_array(input: Vec<(Option<bool>, Option<i32>)>) -> ArrayRef {
+ let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect();
+ let int: Int32Array = input.iter().map(|(_b, i)| i).collect();
+
+ let nullable = true;
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("bool_col", DataType::Boolean, nullable)),
+ Arc::new(boolean) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("int_col", DataType::Int32, nullable)),
+ Arc::new(int) as ArrayRef,
+ ),
+ ]);
+ Arc::new(struct_array)
+ }
+}