alamb commented on code in PR #16014: URL: https://github.com/apache/datafusion/pull/16014#discussion_r2093518770
########## datafusion/common/src/pruning.rs: ########## @@ -0,0 +1,490 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::stats::Precision; +use arrow::array::UInt64Array; +use arrow::datatypes::FieldRef; +use arrow::{ + array::{ArrayRef, BooleanArray}, + datatypes::{Schema, SchemaRef}, +}; + +use crate::Column; +use crate::{ScalarValue, Statistics}; + +/// A source of runtime statistical information to [`PruningPredicate`]s. +/// +/// # Supported Information +/// +/// 1. Minimum and maximum values for columns +/// +/// 2. Null counts and row counts for columns +/// +/// 3. Whether the values in a column are contained in a set of literals +/// +/// # Vectorized Interface +/// +/// Information for containers / files are returned as Arrow [`ArrayRef`], so +/// the evaluation happens once on a single `RecordBatch`, which amortizes the +/// overhead of evaluating the predicate. This is important when pruning 1000s +/// of containers which often happens in analytic systems that have 1000s of +/// potential files to consider. +/// +/// For example, for the following three files with a single column `a`: +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics would return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([10, Null, 30]) +/// min_values("X") -> None +/// ``` +/// +/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html +pub trait PruningStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize; + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray>; +} + +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in [`PartitionPruningStatistics::partition_schema`]. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec<Vec<ScalarValue>>, + partition_fields: Vec<FieldRef>, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values.iter() { + for (i, value) in partition_value.iter().enumerate() { + partition_valeus_by_column[i].push(value.clone()); Review Comment: it would be great to avoid these clones if possible ########## datafusion/common/src/pruning.rs: ########## @@ -0,0 +1,490 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::stats::Precision; +use arrow::array::UInt64Array; +use arrow::datatypes::FieldRef; +use arrow::{ + array::{ArrayRef, BooleanArray}, + datatypes::{Schema, SchemaRef}, +}; + +use crate::Column; +use crate::{ScalarValue, Statistics}; + +/// A source of runtime statistical information to [`PruningPredicate`]s. +/// +/// # Supported Information +/// +/// 1. Minimum and maximum values for columns +/// +/// 2. Null counts and row counts for columns +/// +/// 3. Whether the values in a column are contained in a set of literals +/// +/// # Vectorized Interface +/// +/// Information for containers / files are returned as Arrow [`ArrayRef`], so +/// the evaluation happens once on a single `RecordBatch`, which amortizes the +/// overhead of evaluating the predicate. This is important when pruning 1000s +/// of containers which often happens in analytic systems that have 1000s of +/// potential files to consider. +/// +/// For example, for the following three files with a single column `a`: +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics would return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([10, Null, 30]) +/// min_values("X") -> None +/// ``` +/// +/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html +pub trait PruningStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize; + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray>; +} + +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in [`PartitionPruningStatistics::partition_schema`]. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec<Vec<ScalarValue>>, + partition_fields: Vec<FieldRef>, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values.iter() { + for (i, value) in partition_value.iter().enumerate() { + partition_valeus_by_column[i].push(value.clone()); + } + } + Self { + partition_values: partition_valeus_by_column, + num_containers, + partition_schema, + } + } +} + +impl PruningStatistics for PartitionPruningStatistics { + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut values = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + match partition_value { + ScalarValue::Null => values.push(ScalarValue::Null), + _ => values.push(partition_value.clone()), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + self.min_values(column) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut contained = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + let contained_value = if values.contains(partition_value) { + Some(true) + } else { + Some(false) + }; + contained.push(contained_value); + } + let array = BooleanArray::from(contained); + Some(array) + } +} + +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +pub struct PrunableStatistics { Review Comment: I think this is a good pattern -- it turns out we have something very similar in influxdb_iox: https://github.com/influxdata/influxdb3_core/blob/af9fabea05e2135a094a69dc5b7d549e713420f9/iox_query/src/pruning.rs#L157 ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -111,19 +120,61 @@ impl FileOpener for ParquetOpener { .create(projected_schema, Arc::clone(&self.table_schema)); let predicate = self.predicate.clone(); let table_schema = Arc::clone(&self.table_schema); + let partition_fields = self.partition_fields.clone(); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; let coerce_int96 = self.coerce_int96; let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; - let predicate_creation_errors = MetricBuilder::new(&self.metrics) - .global_counter("num_predicate_creation_errors"); - let enable_page_index = self.enable_page_index; Ok(Box::pin(async move { + // Prune this file using the file level statistics. Review Comment: I worry that in the case when there aren't any dynamic predicates, trying to prune again on file opening is simply going to be pure overhead / wasteful. Therefore, I think it would be good if we could somehow control / disable trying to apply this extra filtering when it is known it would not help For example, maybe we can have a field on `ParquetOpener` with something like `prune_on_open` which can be set to true if there are dynamic predicates present. This would also likely ensure the tests can pass again ########## datafusion/common/src/pruning.rs: ########## @@ -0,0 +1,490 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::stats::Precision; +use arrow::array::UInt64Array; +use arrow::datatypes::FieldRef; +use arrow::{ + array::{ArrayRef, BooleanArray}, + datatypes::{Schema, SchemaRef}, +}; + +use crate::Column; +use crate::{ScalarValue, Statistics}; + +/// A source of runtime statistical information to [`PruningPredicate`]s. +/// +/// # Supported Information +/// +/// 1. Minimum and maximum values for columns +/// +/// 2. Null counts and row counts for columns +/// +/// 3. Whether the values in a column are contained in a set of literals +/// +/// # Vectorized Interface +/// +/// Information for containers / files are returned as Arrow [`ArrayRef`], so +/// the evaluation happens once on a single `RecordBatch`, which amortizes the +/// overhead of evaluating the predicate. This is important when pruning 1000s +/// of containers which often happens in analytic systems that have 1000s of +/// potential files to consider. +/// +/// For example, for the following three files with a single column `a`: +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics would return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([10, Null, 30]) +/// min_values("X") -> None +/// ``` +/// +/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html +pub trait PruningStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize; + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray>; +} + +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in [`PartitionPruningStatistics::partition_schema`]. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec<Vec<ScalarValue>>, + partition_fields: Vec<FieldRef>, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values.iter() { + for (i, value) in partition_value.iter().enumerate() { + partition_valeus_by_column[i].push(value.clone()); + } + } + Self { + partition_values: partition_valeus_by_column, + num_containers, + partition_schema, + } + } +} + +impl PruningStatistics for PartitionPruningStatistics { + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut values = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + match partition_value { + ScalarValue::Null => values.push(ScalarValue::Null), + _ => values.push(partition_value.clone()), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + self.min_values(column) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut contained = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + let contained_value = if values.contains(partition_value) { + Some(true) + } else { + Some(false) + }; + contained.push(contained_value); + } + let array = BooleanArray::from(contained); + Some(array) + } +} + +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +pub struct PrunableStatistics { + /// Statistics for each container. + statistics: Vec<Arc<Statistics>>, Review Comment: I suspect we could just use references here and save a bunch of arcs (not a big deal) but something like ```suggestion statistics: Vec<&'a Statistics>, ``` ########## datafusion/common/src/pruning.rs: ########## @@ -0,0 +1,490 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::stats::Precision; +use arrow::array::UInt64Array; +use arrow::datatypes::FieldRef; +use arrow::{ + array::{ArrayRef, BooleanArray}, + datatypes::{Schema, SchemaRef}, +}; + +use crate::Column; +use crate::{ScalarValue, Statistics}; + +/// A source of runtime statistical information to [`PruningPredicate`]s. +/// +/// # Supported Information +/// +/// 1. Minimum and maximum values for columns +/// +/// 2. Null counts and row counts for columns +/// +/// 3. Whether the values in a column are contained in a set of literals +/// +/// # Vectorized Interface +/// +/// Information for containers / files are returned as Arrow [`ArrayRef`], so +/// the evaluation happens once on a single `RecordBatch`, which amortizes the +/// overhead of evaluating the predicate. This is important when pruning 1000s +/// of containers which often happens in analytic systems that have 1000s of +/// potential files to consider. +/// +/// For example, for the following three files with a single column `a`: +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics would return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([10, Null, 30]) +/// min_values("X") -> None +/// ``` +/// +/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html +pub trait PruningStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize; + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray>; +} + +pub struct PartitionPruningStatistics { Review Comment: I think we should document this struct, specifically including information about how the partition values are mapped to the main schema ########## datafusion/common/src/pruning.rs: ########## @@ -0,0 +1,490 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::stats::Precision; +use arrow::array::UInt64Array; +use arrow::datatypes::FieldRef; +use arrow::{ + array::{ArrayRef, BooleanArray}, + datatypes::{Schema, SchemaRef}, +}; + +use crate::Column; +use crate::{ScalarValue, Statistics}; + +/// A source of runtime statistical information to [`PruningPredicate`]s. +/// +/// # Supported Information +/// +/// 1. Minimum and maximum values for columns +/// +/// 2. Null counts and row counts for columns +/// +/// 3. Whether the values in a column are contained in a set of literals +/// +/// # Vectorized Interface +/// +/// Information for containers / files are returned as Arrow [`ArrayRef`], so +/// the evaluation happens once on a single `RecordBatch`, which amortizes the +/// overhead of evaluating the predicate. This is important when pruning 1000s +/// of containers which often happens in analytic systems that have 1000s of +/// potential files to consider. +/// +/// For example, for the following three files with a single column `a`: +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics would return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([10, Null, 30]) +/// min_values("X") -> None +/// ``` +/// +/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html +pub trait PruningStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize; + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, column: &Column) -> Option<ArrayRef>; + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray>; +} + +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in [`PartitionPruningStatistics::partition_schema`]. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec<Vec<ScalarValue>>, + partition_fields: Vec<FieldRef>, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values.iter() { + for (i, value) in partition_value.iter().enumerate() { + partition_valeus_by_column[i].push(value.clone()); + } + } + Self { + partition_values: partition_valeus_by_column, + num_containers, + partition_schema, + } + } +} + +impl PruningStatistics for PartitionPruningStatistics { + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut values = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + match partition_value { + ScalarValue::Null => values.push(ScalarValue::Null), + _ => values.push(partition_value.clone()), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + self.min_values(column) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut contained = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + let contained_value = if values.contains(partition_value) { + Some(true) + } else { + Some(false) + }; + contained.push(contained_value); + } + let array = BooleanArray::from(contained); + Some(array) + } +} + +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +pub struct PrunableStatistics { + /// Statistics for each container. + statistics: Vec<Arc<Statistics>>, + /// The schema of the file these statistics are for. + schema: SchemaRef, +} + +impl PrunableStatistics { + /// Create a new instance of [`PrunableStatistics`]. + /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). + /// The `schema` is the schema of the data in the containers and should apply to all files. + pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self { + Self { statistics, schema } + } +} + +impl PruningStatistics for PrunableStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.min_value { + Precision::Exact(min) => { + values.push(min.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.max_value { + Precision::Exact(max) => { + values.push(max.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert max values to array for column {}", + column.name() + ); + None + } + } + } + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize { + self.statistics.len() + } + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + let mut has_null_count = false; + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.null_count { + Precision::Exact(null_count) => match u64::try_from(*null_count) { + Ok(null_count) => { + has_null_count = true; + values.push(Some(null_count)); + } + Err(_) => { + values.push(None); + } + }, + _ => values.push(None), + } + } + if has_null_count { + Some(Arc::new(UInt64Array::from(values))) + } else { + None + } + } + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { + let mut values = Vec::with_capacity(self.statistics.len()); + let mut has_row_count = false; + for stats in &self.statistics { + match &stats.num_rows { + Precision::Exact(row_count) => match u64::try_from(*row_count) { + Ok(row_count) => { + has_row_count = true; + values.push(Some(row_count)); + } + Err(_) => { + values.push(None); + } + }, + _ => values.push(None), + } + } + if has_row_count { + Some(Arc::new(UInt64Array::from(values))) + } else { + None + } + } + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + _column: &Column, + _values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray> { + None + } +} + +pub struct CompositePruningStatistics { Review Comment: This is a very fancy idea -- it probably needs some more comments about what it does (namely combines multiple sources together where if one pruning statistics doesn't have information for a particular column, tries the other `PruningStatistics` in turn ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -450,3 +501,146 @@ fn should_enable_page_index( .map(|p| p.filter_number() > 0) .unwrap_or(false) } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use bytes::{BufMut, BytesMut}; + use chrono::Utc; + use datafusion_common::{ + record_batch, stats::Precision, ColumnStatistics, ScalarValue, Statistics, + }; + use datafusion_datasource::{ + file_meta::FileMeta, file_stream::FileOpener, + schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile, + }; + use datafusion_expr::{col, lit}; + use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use futures::{Stream, StreamExt}; + use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; + use parquet::arrow::ArrowWriter; + + use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; + + async fn count_batches_and_rows( + mut stream: std::pin::Pin< + Box< + dyn Stream< + Item = Result< + arrow::array::RecordBatch, + arrow::error::ArrowError, + >, + > + Send, + >, + >, + ) -> (usize, usize) { + let mut num_batches = 0; + let mut num_rows = 0; + while let Some(Ok(batch)) = stream.next().await { + num_rows += batch.num_rows(); + num_batches += 1; + } + (num_batches, num_rows) + } + + #[tokio::test] + async fn test_prune_based_on_statistics() { + let batch = record_batch!( + ("a", Int32, vec![Some(1), Some(2), Some(2)]), + ("b", Float32, vec![Some(1.0), Some(2.0), None]) + ) + .unwrap(); + + let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>; + let mut out = BytesMut::new().writer(); + { + let mut writer = + ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + let data_size = data.len(); + store + .put(&Path::from("test.parquet"), data.into()) + .await + .unwrap(); + + let schema = batch.schema(); + let file = PartitionedFile::new( + "file.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ) + .with_statistics(Arc::new( + Statistics::new_unknown(&schema) + .add_column_statistics(ColumnStatistics::new_unknown()) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float32(Some(1.0)))) + .with_max_value(Precision::Exact(ScalarValue::Float32(Some(2.0)))) + .with_null_count(Precision::Exact(1)), + ), + )); + + let make_opener = |predicate| { + ParquetOpener { + partition_index: 0, + projection: Arc::new([0, 1]), + batch_size: 1024, + limit: None, + predicate: Some(predicate), + table_schema: schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new( + DefaultParquetFileReaderFactory::new(Arc::clone(&store)), + ), + partition_fields: vec![], Review Comment: we probably need a test for pruning on partition_fields as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org