This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8524d58e30 Implement `contained` API in PruningPredicate (#8440)
8524d58e30 is described below
commit 8524d58e303b65597eeebc41c75025a6f0822793
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Dec 23 07:10:56 2023 -0500
Implement `contained` API in PruningPredicate (#8440)
* Implement `contains` API in PruningPredicate
* Apply suggestions from code review
Co-authored-by: Nga Tran <[email protected]>
* Add comment to len(), fix fmt
* rename BoolVecBuilder::append* to BoolVecBuilder::combine*
---------
Co-authored-by: Nga Tran <[email protected]>
---
.../physical_plan/parquet/page_filter.rs | 11 +-
.../datasource/physical_plan/parquet/row_groups.rs | 9 +
datafusion/core/src/physical_optimizer/pruning.rs | 1073 +++++++++++++++-----
3 files changed, 857 insertions(+), 236 deletions(-)
diff --git
a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index 42bfef3599..f6310c49bc 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -23,7 +23,7 @@ use arrow::array::{
};
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
@@ -37,6 +37,7 @@ use parquet::{
},
format::PageLocation,
};
+use std::collections::HashSet;
use std::sync::Arc;
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
@@ -554,4 +555,12 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
))),
}
}
+
+ fn contained(
+ &self,
+ _column: &datafusion_common::Column,
+ _values: &HashSet<ScalarValue>,
+ ) -> Option<BooleanArray> {
+ None
+ }
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 7c3f7d9384..09e4907c94 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -16,6 +16,7 @@
// under the License.
use arrow::{array::ArrayRef, datatypes::Schema};
+use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
@@ -340,6 +341,14 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count()));
scalar.to_array().ok()
}
+
+ fn contained(
+ &self,
+ _column: &Column,
+ _values: &HashSet<ScalarValue>,
+ ) -> Option<BooleanArray> {
+ None
+ }
}
#[cfg(test)]
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index b2ba7596db..79e084d7b7 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -35,12 +35,13 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
-use datafusion_common::{downcast_value, plan_datafusion_err, ScalarValue};
+use arrow_array::cast::AsArray;
use datafusion_common::{
internal_err, plan_err,
tree_node::{Transformed, TreeNode},
};
-use datafusion_physical_expr::utils::collect_columns;
+use datafusion_common::{plan_datafusion_err, ScalarValue};
+use datafusion_physical_expr::utils::{collect_columns, Guarantee,
LiteralGuarantee};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use log::trace;
@@ -93,6 +94,30 @@ pub trait PruningStatistics {
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
+
+ /// Returns an array where each row represents information known about
+ /// the `values` contained in a column.
+ ///
+ /// This API is designed to be used along with [`LiteralGuarantee`] to
prove
+ /// that predicates can not possibly evaluate to `true` and thus prune
+ /// containers. For example, Parquet Bloom Filters can prove that values
are
+ /// not present.
+ ///
+ /// The returned array has one row for each container, with the following
+ /// meanings:
+ /// * `true` if the values in `column` ONLY contain values from `values`
+ /// * `false` if the values in `column` are NOT ANY of `values`
+ /// * `null` if the neither of the above holds or is unknown.
+ ///
+ /// If these statistics can not determine column membership for any
+ /// container, return `None` (the default).
+ ///
+ /// Note: the returned array must contain [`Self::num_containers`] rows
+ fn contained(
+ &self,
+ column: &Column,
+ values: &HashSet<ScalarValue>,
+ ) -> Option<BooleanArray>;
}
/// Evaluates filter expressions on statistics such as min/max values and null
@@ -142,12 +167,17 @@ pub trait PruningStatistics {
pub struct PruningPredicate {
/// The input schema against which the predicate will be evaluated
schema: SchemaRef,
- /// Actual pruning predicate (rewritten in terms of column min/max
statistics)
+ /// A min/max pruning predicate (rewritten in terms of column min/max
+ /// values, which are supplied by statistics)
predicate_expr: Arc<dyn PhysicalExpr>,
- /// The statistics required to evaluate this predicate
- required_columns: RequiredStatColumns,
- /// Original physical predicate from which this predicate expr is derived
(required for serialization)
+ /// Description of which statistics are required to evaluate
`predicate_expr`
+ required_columns: RequiredColumns,
+ /// Original physical predicate from which this predicate expr is derived
+ /// (required for serialization)
orig_expr: Arc<dyn PhysicalExpr>,
+ /// [`LiteralGuarantee`]s that are used to try and prove a predicate can
not
+ /// possibly evaluate to `true`.
+ literal_guarantees: Vec<LiteralGuarantee>,
}
impl PruningPredicate {
@@ -172,14 +202,18 @@ impl PruningPredicate {
/// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) ->
Result<Self> {
// build predicate expression once
- let mut required_columns = RequiredStatColumns::new();
+ let mut required_columns = RequiredColumns::new();
let predicate_expr =
build_predicate_expression(&expr, schema.as_ref(), &mut
required_columns);
+
+ let literal_guarantees = LiteralGuarantee::analyze(&expr);
+
Ok(Self {
schema,
predicate_expr,
required_columns,
orig_expr: expr,
+ literal_guarantees,
})
}
@@ -198,40 +232,47 @@ impl PruningPredicate {
///
/// [`ExprSimplifier`]:
crate::optimizer::simplify_expressions::ExprSimplifier
pub fn prune<S: PruningStatistics>(&self, statistics: &S) ->
Result<Vec<bool>> {
+ let mut builder = BoolVecBuilder::new(statistics.num_containers());
+
+ // Try to prove the predicate can't be true for the containers based on
+ // literal guarantees
+ for literal_guarantee in &self.literal_guarantees {
+ let LiteralGuarantee {
+ column,
+ guarantee,
+ literals,
+ } = literal_guarantee;
+ if let Some(results) = statistics.contained(column, literals) {
+ match guarantee {
+ // `In` means the values in the column must be one of the
+ // values in the set for the predicate to evaluate to true.
+ // If `contained` returns false, that means the column is
+ // not any of the values so we can prune the container
+ Guarantee::In => builder.combine_array(&results),
+ // `NotIn` means the values in the column must must not be
+ // any of the values in the set for the predicate to
+ // evaluate to true. If contained returns true, it means
the
+ // column is only in the set of values so we can prune the
+ // container
+ Guarantee::NotIn => {
+ builder.combine_array(&arrow::compute::not(&results)?)
+ }
+ }
+ }
+ }
+
+ // Next, try to prove the predicate can't be true for the containers
based
+ // on min/max values
+
// build a RecordBatch that contains the min/max values in the
- // appropriate statistics columns
+ // appropriate statistics columns for the min/max predicate
let statistics_batch =
build_statistics_record_batch(statistics, &self.required_columns)?;
- // Evaluate the pruning predicate on that record batch.
- //
- // Use true when the result of evaluating a predicate
- // expression on a row group is null (aka `None`). Null can
- // arise when the statistics are unknown or some calculation
- // in the predicate means we don't know for sure if the row
- // group can be filtered out or not. To maintain correctness
- // the row group must be kept and thus `true` is returned.
- match self.predicate_expr.evaluate(&statistics_batch)? {
- ColumnarValue::Array(array) => {
- let predicate_array = downcast_value!(array, BooleanArray);
+ // Evaluate the pruning predicate on that record batch and append any
results to the builder
+
builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
- Ok(predicate_array
- .into_iter()
- .map(|x| x.unwrap_or(true)) // None -> true per comments
above
- .collect::<Vec<_>>())
- }
- // result was a column
- ColumnarValue::Scalar(ScalarValue::Boolean(v)) => {
- let v = v.unwrap_or(true); // None -> true per comments above
- Ok(vec![v; statistics.num_containers()])
- }
- other => {
- internal_err!(
- "Unexpected result of pruning predicate evaluation.
Expected Boolean array \
- or scalar but got {other:?}"
- )
- }
- }
+ Ok(builder.build())
}
/// Return a reference to the input schema
@@ -254,9 +295,91 @@ impl PruningPredicate {
is_always_true(&self.predicate_expr)
}
- pub(crate) fn required_columns(&self) -> &RequiredStatColumns {
+ pub(crate) fn required_columns(&self) -> &RequiredColumns {
&self.required_columns
}
+
+ /// Names of the columns that are known to be / not be in a set
+ /// of literals (constants). These are the columns the that may be passed
to
+ /// [`PruningStatistics::contained`] during pruning.
+ ///
+ /// This is useful to avoid fetching statistics for columns that will not
be
+ /// used in the predicate. For example, it can be used to avoid reading
+ /// uneeded bloom filters (a non trivial operation).
+ pub fn literal_columns(&self) -> Vec<String> {
+ let mut seen = HashSet::new();
+ self.literal_guarantees
+ .iter()
+ .map(|e| &e.column.name)
+ // avoid duplicates
+ .filter(|name| seen.insert(*name))
+ .map(|s| s.to_string())
+ .collect()
+ }
+}
+
+/// Builds the return `Vec` for [`PruningPredicate::prune`].
+#[derive(Debug)]
+struct BoolVecBuilder {
+ /// One element per container. Each element is
+ /// * `true`: if the container has row that may pass the predicate
+ /// * `false`: if the container has rows that DEFINITELY DO NOT pass the
predicate
+ inner: Vec<bool>,
+}
+
+impl BoolVecBuilder {
+ /// Create a new `BoolVecBuilder` with `num_containers` elements
+ fn new(num_containers: usize) -> Self {
+ Self {
+ // assume by default all containers may pass the predicate
+ inner: vec![true; num_containers],
+ }
+ }
+
+ /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
+ /// predicate into the currently in progress array.
+ ///
+ /// Each `array` element is:
+ /// * `true`: container has row that may pass the predicate
+ /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
+ /// * `null`: container may or may not have rows that pass the predicate
+ fn combine_array(&mut self, array: &BooleanArray) {
+ assert_eq!(array.len(), self.inner.len());
+ for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
+ // `false` for this conjunct means we know for sure no rows could
+ // pass the predicate and thus we set the corresponding container
+ // location to false.
+ if let Some(false) = new {
+ *cur = false;
+ }
+ }
+ }
+
+ /// Combines the results in the [`ColumnarValue`] to the currently in
+ /// progress array, following the same rules as [`Self::combine_array`].
+ ///
+ /// # Panics
+ /// If `value` is not boolean
+ fn combine_value(&mut self, value: ColumnarValue) {
+ match value {
+ ColumnarValue::Array(array) => {
+ self.combine_array(array.as_boolean());
+ }
+ ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
+ // False means all containers can not pass the predicate
+ self.inner = vec![false; self.inner.len()];
+ }
+ _ => {
+ // Null or true means the rows in container may pass this
+ // conjunct so we can't prune any containers based on that
+ }
+ }
+ }
+
+ /// Convert this builder into a Vec of bools
+ fn build(self) -> Vec<bool> {
+ self.inner
+ }
}
fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
@@ -276,21 +399,21 @@ fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
/// Handles creating references to the min/max statistics
/// for columns as well as recording which statistics are needed
#[derive(Debug, Default, Clone)]
-pub(crate) struct RequiredStatColumns {
+pub(crate) struct RequiredColumns {
/// The statistics required to evaluate this predicate:
/// * The unqualified column in the input schema
/// * Statistics type (e.g. Min or Max or Null_Count)
/// * The field the statistics value should be placed in for
- /// pruning predicate evaluation
+ /// pruning predicate evaluation (e.g. `min_value` or `max_value`)
columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
}
-impl RequiredStatColumns {
+impl RequiredColumns {
fn new() -> Self {
Self::default()
}
- /// Returns number of unique columns.
+ /// Returns number of unique columns
pub(crate) fn n_columns(&self) -> usize {
self.iter()
.map(|(c, _s, _f)| c)
@@ -344,11 +467,10 @@ impl RequiredStatColumns {
// only add statistics column if not previously added
if need_to_insert {
- let stat_field = Field::new(
- stat_column.name(),
- field.data_type().clone(),
- field.is_nullable(),
- );
+ // may be null if statistics are not present
+ let nullable = true;
+ let stat_field =
+ Field::new(stat_column.name(), field.data_type().clone(),
nullable);
self.columns.push((column.clone(), stat_type, stat_field));
}
rewrite_column_expr(column_expr.clone(), column, &stat_column)
@@ -391,7 +513,7 @@ impl RequiredStatColumns {
}
}
-impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for
RequiredStatColumns {
+impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns
{
fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
Self { columns }
}
@@ -424,7 +546,7 @@ impl From<Vec<(phys_expr::Column, StatisticsType, Field)>>
for RequiredStatColum
/// ```
fn build_statistics_record_batch<S: PruningStatistics>(
statistics: &S,
- required_columns: &RequiredStatColumns,
+ required_columns: &RequiredColumns,
) -> Result<RecordBatch> {
let mut fields = Vec::<Field>::new();
let mut arrays = Vec::<ArrayRef>::new();
@@ -480,7 +602,7 @@ struct PruningExpressionBuilder<'a> {
op: Operator,
scalar_expr: Arc<dyn PhysicalExpr>,
field: &'a Field,
- required_columns: &'a mut RequiredStatColumns,
+ required_columns: &'a mut RequiredColumns,
}
impl<'a> PruningExpressionBuilder<'a> {
@@ -489,7 +611,7 @@ impl<'a> PruningExpressionBuilder<'a> {
right: &'a Arc<dyn PhysicalExpr>,
op: Operator,
schema: &'a Schema,
- required_columns: &'a mut RequiredStatColumns,
+ required_columns: &'a mut RequiredColumns,
) -> Result<Self> {
// find column name; input could be a more complicated expression
let left_columns = collect_columns(left);
@@ -704,7 +826,7 @@ fn reverse_operator(op: Operator) -> Result<Operator> {
fn build_single_column_expr(
column: &phys_expr::Column,
schema: &Schema,
- required_columns: &mut RequiredStatColumns,
+ required_columns: &mut RequiredColumns,
is_not: bool, // if true, treat as !col
) -> Option<Arc<dyn PhysicalExpr>> {
let field = schema.field_with_name(column.name()).ok()?;
@@ -745,7 +867,7 @@ fn build_single_column_expr(
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
- required_columns: &mut RequiredStatColumns,
+ required_columns: &mut RequiredColumns,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;
@@ -775,7 +897,7 @@ fn build_is_null_column_expr(
fn build_predicate_expression(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
- required_columns: &mut RequiredStatColumns,
+ required_columns: &mut RequiredColumns,
) -> Arc<dyn PhysicalExpr> {
// Returned for unsupported expressions. Such expressions are
// converted to TRUE.
@@ -984,7 +1106,7 @@ mod tests {
use std::collections::HashMap;
use std::ops::{Not, Rem};
- #[derive(Debug)]
+ #[derive(Debug, Default)]
/// Mock statistic provider for tests
///
/// Each row represents the statistics for a "container" (which
@@ -993,95 +1115,142 @@ mod tests {
///
/// Note All `ArrayRefs` must be the same size.
struct ContainerStats {
- min: ArrayRef,
- max: ArrayRef,
+ min: Option<ArrayRef>,
+ max: Option<ArrayRef>,
/// Optional values
null_counts: Option<ArrayRef>,
+ /// Optional known values (e.g. mimic a bloom filter)
+ /// (value, contained)
+ /// If present, all BooleanArrays must be the same size as min/max
+ contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
}
impl ContainerStats {
+ fn new() -> Self {
+ Default::default()
+ }
fn new_decimal128(
min: impl IntoIterator<Item = Option<i128>>,
max: impl IntoIterator<Item = Option<i128>>,
precision: u8,
scale: i8,
) -> Self {
- Self {
- min: Arc::new(
+ Self::new()
+ .with_min(Arc::new(
min.into_iter()
.collect::<Decimal128Array>()
.with_precision_and_scale(precision, scale)
.unwrap(),
- ),
- max: Arc::new(
+ ))
+ .with_max(Arc::new(
max.into_iter()
.collect::<Decimal128Array>()
.with_precision_and_scale(precision, scale)
.unwrap(),
- ),
- null_counts: None,
- }
+ ))
}
fn new_i64(
min: impl IntoIterator<Item = Option<i64>>,
max: impl IntoIterator<Item = Option<i64>>,
) -> Self {
- Self {
- min: Arc::new(min.into_iter().collect::<Int64Array>()),
- max: Arc::new(max.into_iter().collect::<Int64Array>()),
- null_counts: None,
- }
+ Self::new()
+ .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
+ .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
}
fn new_i32(
min: impl IntoIterator<Item = Option<i32>>,
max: impl IntoIterator<Item = Option<i32>>,
) -> Self {
- Self {
- min: Arc::new(min.into_iter().collect::<Int32Array>()),
- max: Arc::new(max.into_iter().collect::<Int32Array>()),
- null_counts: None,
- }
+ Self::new()
+ .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
+ .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
}
fn new_utf8<'a>(
min: impl IntoIterator<Item = Option<&'a str>>,
max: impl IntoIterator<Item = Option<&'a str>>,
) -> Self {
- Self {
- min: Arc::new(min.into_iter().collect::<StringArray>()),
- max: Arc::new(max.into_iter().collect::<StringArray>()),
- null_counts: None,
- }
+ Self::new()
+ .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
+ .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
}
fn new_bool(
min: impl IntoIterator<Item = Option<bool>>,
max: impl IntoIterator<Item = Option<bool>>,
) -> Self {
- Self {
- min: Arc::new(min.into_iter().collect::<BooleanArray>()),
- max: Arc::new(max.into_iter().collect::<BooleanArray>()),
- null_counts: None,
- }
+ Self::new()
+ .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
+ .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
}
fn min(&self) -> Option<ArrayRef> {
- Some(self.min.clone())
+ self.min.clone()
}
fn max(&self) -> Option<ArrayRef> {
- Some(self.max.clone())
+ self.max.clone()
}
fn null_counts(&self) -> Option<ArrayRef> {
self.null_counts.clone()
}
+ /// return an iterator over all arrays in this statistics
+ fn arrays(&self) -> Vec<ArrayRef> {
+ let contained_arrays = self
+ .contained
+ .iter()
+ .map(|(_values, contained)| Arc::new(contained.clone()) as
ArrayRef);
+
+ [
+ self.min.as_ref().cloned(),
+ self.max.as_ref().cloned(),
+ self.null_counts.as_ref().cloned(),
+ ]
+ .into_iter()
+ .flatten()
+ .chain(contained_arrays)
+ .collect()
+ }
+
+ /// Returns the number of containers represented by this statistics
This
+ /// picks the length of the first array as all arrays must have the
same
+ /// length (which is verified by `assert_invariants`).
fn len(&self) -> usize {
- assert_eq!(self.min.len(), self.max.len());
- self.min.len()
+ // pick the first non zero length
+ self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
+ }
+
+ /// Ensure that the lengths of all arrays are consistent
+ fn assert_invariants(&self) {
+ let mut prev_len = None;
+
+ for len in self.arrays().iter().map(|a| a.len()) {
+ // Get a length, if we don't already have one
+ match prev_len {
+ None => {
+ prev_len = Some(len);
+ }
+ Some(prev_len) => {
+ assert_eq!(prev_len, len);
+ }
+ }
+ }
+ }
+
+ /// Add min values
+ fn with_min(mut self, min: ArrayRef) -> Self {
+ self.min = Some(min);
+ self
+ }
+
+ /// Add max values
+ fn with_max(mut self, max: ArrayRef) -> Self {
+ self.max = Some(max);
+ self
}
/// Add null counts. There must be the same number of null counts as
@@ -1090,14 +1259,36 @@ mod tests {
mut self,
counts: impl IntoIterator<Item = Option<i64>>,
) -> Self {
- // take stats out and update them
let null_counts: ArrayRef =
Arc::new(counts.into_iter().collect::<Int64Array>());
- assert_eq!(null_counts.len(), self.len());
+ self.assert_invariants();
self.null_counts = Some(null_counts);
self
}
+
+ /// Add contained information.
+ pub fn with_contained(
+ mut self,
+ values: impl IntoIterator<Item = ScalarValue>,
+ contained: impl IntoIterator<Item = Option<bool>>,
+ ) -> Self {
+ let contained: BooleanArray = contained.into_iter().collect();
+ let values: HashSet<_> = values.into_iter().collect();
+
+ self.contained.push((values, contained));
+ self.assert_invariants();
+ self
+ }
+
+ /// get any contained information for the specified values
+ fn contained(&self, find_values: &HashSet<ScalarValue>) ->
Option<BooleanArray> {
+ // find the one with the matching values
+ self.contained
+ .iter()
+ .find(|(values, _contained)| values == find_values)
+ .map(|(_values, contained)| contained.clone())
+ }
}
#[derive(Debug, Default)]
@@ -1135,13 +1326,34 @@ mod tests {
let container_stats = self
.stats
.remove(&col)
- .expect("Can not find stats for column")
+ .unwrap_or_default()
.with_null_counts(counts);
// put stats back in
self.stats.insert(col, container_stats);
self
}
+
+ /// Add contained information for the specified columm.
+ fn with_contained(
+ mut self,
+ name: impl Into<String>,
+ values: impl IntoIterator<Item = ScalarValue>,
+ contained: impl IntoIterator<Item = Option<bool>>,
+ ) -> Self {
+ let col = Column::from_name(name.into());
+
+ // take stats out and update them
+ let container_stats = self
+ .stats
+ .remove(&col)
+ .unwrap_or_default()
+ .with_contained(values, contained);
+
+ // put stats back in
+ self.stats.insert(col, container_stats);
+ self
+ }
}
impl PruningStatistics for TestStatistics {
@@ -1173,6 +1385,16 @@ mod tests {
.map(|container_stats| container_stats.null_counts())
.unwrap_or(None)
}
+
+ fn contained(
+ &self,
+ column: &Column,
+ values: &HashSet<ScalarValue>,
+ ) -> Option<BooleanArray> {
+ self.stats
+ .get(column)
+ .and_then(|container_stats| container_stats.contained(values))
+ }
}
/// Returns the specified min/max container values
@@ -1198,12 +1420,20 @@ mod tests {
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
+
+ fn contained(
+ &self,
+ _column: &Column,
+ _values: &HashSet<ScalarValue>,
+ ) -> Option<BooleanArray> {
+ None
+ }
}
#[test]
fn test_build_statistics_record_batch() {
// Request a record batch with of s1_min, s2_max, s3_max, s3_min
- let required_columns = RequiredStatColumns::from(vec![
+ let required_columns = RequiredColumns::from(vec![
// min of original column s1, named s1_min
(
phys_expr::Column::new("s1", 1),
@@ -1275,7 +1505,7 @@ mod tests {
// which is what Parquet does
// Request a record batch with of s1_min as a timestamp
- let required_columns = RequiredStatColumns::from(vec![(
+ let required_columns = RequiredColumns::from(vec![(
phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new(
@@ -1307,7 +1537,7 @@ mod tests {
#[test]
fn test_build_statistics_no_required_stats() {
- let required_columns = RequiredStatColumns::new();
+ let required_columns = RequiredColumns::new();
let statistics = OneContainerStats {
min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
@@ -1325,7 +1555,7 @@ mod tests {
// Test requesting a Utf8 column when the stats return some other type
// Request a record batch with of s1_min as a timestamp
- let required_columns = RequiredStatColumns::from(vec![(
+ let required_columns = RequiredColumns::from(vec![(
phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new("s1_min", DataType::Utf8, true),
@@ -1354,7 +1584,7 @@ mod tests {
#[test]
fn test_build_statistics_inconsistent_length() {
// return an inconsistent length to the actual statistics arrays
- let required_columns = RequiredStatColumns::from(vec![(
+ let required_columns = RequiredColumns::from(vec![(
phys_expr::Column::new("s1", 3),
StatisticsType::Min,
Field::new("s1_min", DataType::Int64, true),
@@ -1385,20 +1615,14 @@ mod tests {
// test column on the left
let expr = col("c1").eq(lit(1));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).eq(col("c1"));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1411,20 +1635,14 @@ mod tests {
// test column on the left
let expr = col("c1").not_eq(lit(1));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).not_eq(col("c1"));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1437,20 +1655,14 @@ mod tests {
// test column on the left
let expr = col("c1").gt(lit(1));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).lt(col("c1"));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1463,19 +1675,13 @@ mod tests {
// test column on the left
let expr = col("c1").gt_eq(lit(1));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).lt_eq(col("c1"));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1488,20 +1694,14 @@ mod tests {
// test column on the left
let expr = col("c1").lt(lit(1));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).gt(col("c1"));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1514,19 +1714,13 @@ mod tests {
// test column on the left
let expr = col("c1").lt_eq(lit(1));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).gt_eq(col("c1"));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1542,11 +1736,8 @@ mod tests {
// test AND operator joining supported c1 < 1 expression and
unsupported c2 > c3 expression
let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
let expected_expr = "c1_min@0 < 1";
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1561,11 +1752,8 @@ mod tests {
// test OR operator joining supported c1 < 1 expression and
unsupported c2 % 2 = 0 expression
let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
let expected_expr = "true";
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1577,11 +1765,8 @@ mod tests {
let expected_expr = "true";
let expr = col("c1").not();
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1593,11 +1778,8 @@ mod tests {
let expected_expr = "NOT c1_min@0 AND c1_max@1";
let expr = col("c1").not();
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1609,11 +1791,8 @@ mod tests {
let expected_expr = "c1_min@0 OR c1_max@1";
let expr = col("c1");
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1627,11 +1806,8 @@ mod tests {
// DF doesn't support arithmetic on boolean columns so
// this predicate will error when evaluated
let expr = col("c1").lt(lit(true));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1643,7 +1819,7 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
- let mut required_columns = RequiredStatColumns::new();
+ let mut required_columns = RequiredColumns::new();
// c1 < 1 and (c2 = 2 or c2 = 3)
let expr = col("c1")
.lt(lit(1))
@@ -1659,7 +1835,7 @@ mod tests {
(
phys_expr::Column::new("c1", 0),
StatisticsType::Min,
- c1_min_field
+ c1_min_field.with_nullable(true) // could be nullable if stats
are not present
)
);
// c2 = 2 should add c2_min and c2_max
@@ -1669,7 +1845,7 @@ mod tests {
(
phys_expr::Column::new("c2", 1),
StatisticsType::Min,
- c2_min_field
+ c2_min_field.with_nullable(true) // could be nullable if stats
are not present
)
);
let c2_max_field = Field::new("c2_max", DataType::Int32, false);
@@ -1678,7 +1854,7 @@ mod tests {
(
phys_expr::Column::new("c2", 1),
StatisticsType::Max,
- c2_max_field
+ c2_max_field.with_nullable(true) // could be nullable if stats
are not present
)
);
// c2 = 3 shouldn't add any new statistics fields
@@ -1700,11 +1876,8 @@ mod tests {
false,
));
let expected_expr = "c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_min@0 <= 2
AND 2 <= c1_max@1 OR c1_min@0 <= 3 AND 3 <= c1_max@1";
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1719,11 +1892,8 @@ mod tests {
// test c1 in()
let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![],
false));
let expected_expr = "true";
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1744,11 +1914,8 @@ mod tests {
let expected_expr = "(c1_min@0 != 1 OR 1 != c1_max@1) \
AND (c1_min@0 != 2 OR 2 != c1_max@1) \
AND (c1_min@0 != 3 OR 3 != c1_max@1)";
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1762,20 +1929,14 @@ mod tests {
// test column on the left
let expr = cast(col("c1"),
DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"),
DataType::Int64));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expected_expr = "TRY_CAST(c1_max@0 AS Int64) > 1";
@@ -1783,21 +1944,15 @@ mod tests {
// test column on the left
let expr =
try_cast(col("c1"),
DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr =
lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"),
DataType::Int64));
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -1817,11 +1972,8 @@ mod tests {
false,
));
let expected_expr = "CAST(c1_min@0 AS Int64) <= 1 AND 1 <=
CAST(c1_max@1 AS Int64) OR CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1
AS Int64) OR CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = Expr::InList(InList::new(
@@ -1837,11 +1989,8 @@ mod tests {
"(CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) \
AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) \
AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
- let predicate_expr = test_build_predicate_expression(
- &expr,
- &schema,
- &mut RequiredStatColumns::new(),
- );
+ let predicate_expr =
+ test_build_predicate_expression(&expr, &schema, &mut
RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
@@ -2484,10 +2633,464 @@ mod tests {
// TODO: add other negative test for other case and op
}
+ #[test]
+ fn prune_with_contained_one_column() {
+ let schema = Arc::new(Schema::new(vec![Field::new("s1",
DataType::Utf8, true)]));
+
+ // Model having information like a bloom filter for s1
+ let statistics = TestStatistics::new()
+ .with_contained(
+ "s1",
+ [ScalarValue::from("foo")],
+ [
+ // container 0 known to only contain "foo"",
+ Some(true),
+ // container 1 known to not contain "foo"
+ Some(false),
+ // container 2 unknown about "foo"
+ None,
+ // container 3 known to only contain "foo"
+ Some(true),
+ // container 4 known to not contain "foo"
+ Some(false),
+ // container 5 unknown about "foo"
+ None,
+ // container 6 known to only contain "foo"
+ Some(true),
+ // container 7 known to not contain "foo"
+ Some(false),
+ // container 8 unknown about "foo"
+ None,
+ ],
+ )
+ .with_contained(
+ "s1",
+ [ScalarValue::from("bar")],
+ [
+ // containers 0,1,2 known to only contain "bar"
+ Some(true),
+ Some(true),
+ Some(true),
+ // container 3,4,5 known to not contain "bar"
+ Some(false),
+ Some(false),
+ Some(false),
+ // container 6,7,8 unknown about "bar"
+ None,
+ None,
+ None,
+ ],
+ )
+ .with_contained(
+ // the way the tests are setup, this data is
+ // consulted if the "foo" and "bar" are being checked at the
same time
+ "s1",
+ [ScalarValue::from("foo"), ScalarValue::from("bar")],
+ [
+ // container 0,1,2 unknown about ("foo, "bar")
+ None,
+ None,
+ None,
+ // container 3,4,5 known to contain only either "foo" and
"bar"
+ Some(true),
+ Some(true),
+ Some(true),
+ // container 6,7,8 known to contain neither "foo" and
"bar"
+ Some(false),
+ Some(false),
+ Some(false),
+ ],
+ );
+
+ // s1 = 'foo'
+ prune_with_expr(
+ col("s1").eq(lit("foo")),
+ &schema,
+ &statistics,
+ // rule out containers ('false) where we know foo is not present
+ vec![true, false, true, true, false, true, true, false, true],
+ );
+
+ // s1 = 'bar'
+ prune_with_expr(
+ col("s1").eq(lit("bar")),
+ &schema,
+ &statistics,
+ // rule out containers where we know bar is not present
+ vec![true, true, true, false, false, false, true, true, true],
+ );
+
+ // s1 = 'baz' (unknown value)
+ prune_with_expr(
+ col("s1").eq(lit("baz")),
+ &schema,
+ &statistics,
+ // can't rule out anything
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 = 'foo' AND s1 = 'bar'
+ prune_with_expr(
+ col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // logically this predicate can't possibly be true (the column
can't
+ // take on both values) but we could rule it out if the stats tell
+ // us that both values are not present
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 = 'foo' OR s1 = 'bar'
+ prune_with_expr(
+ col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // can rule out containers that we know contain neither foo nor bar
+ vec![true, true, true, true, true, true, false, false, false],
+ );
+
+ // s1 = 'foo' OR s1 = 'baz'
+ prune_with_expr(
+ col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
+ &schema,
+ &statistics,
+ // can't rule out anything container
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
+ prune_with_expr(
+ col("s1")
+ .eq(lit("foo"))
+ .or(col("s1").eq(lit("bar")))
+ .or(col("s1").eq(lit("baz"))),
+ &schema,
+ &statistics,
+ // can rule out any containers based on knowledge of s1 and `foo`,
+ // `bar` and (`foo`, `bar`)
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 != foo
+ prune_with_expr(
+ col("s1").not_eq(lit("foo")),
+ &schema,
+ &statistics,
+ // rule out containers we know for sure only contain foo
+ vec![false, true, true, false, true, true, false, true, true],
+ );
+
+ // s1 != bar
+ prune_with_expr(
+ col("s1").not_eq(lit("bar")),
+ &schema,
+ &statistics,
+ // rule out when we know for sure s1 has the value bar
+ vec![false, false, false, true, true, true, true, true, true],
+ );
+
+ // s1 != foo AND s1 != bar
+ prune_with_expr(
+ col("s1")
+ .not_eq(lit("foo"))
+ .and(col("s1").not_eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // can rule out any container where we know s1 does not have
either 'foo' or 'bar'
+ vec![true, true, true, false, false, false, true, true, true],
+ );
+
+ // s1 != foo AND s1 != bar AND s1 != baz
+ prune_with_expr(
+ col("s1")
+ .not_eq(lit("foo"))
+ .and(col("s1").not_eq(lit("bar")))
+ .and(col("s1").not_eq(lit("baz"))),
+ &schema,
+ &statistics,
+ // can't rule out any container based on knowledge of s1,s2
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 != foo OR s1 != bar
+ prune_with_expr(
+ col("s1")
+ .not_eq(lit("foo"))
+ .or(col("s1").not_eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // cant' rule out anything based on contains information
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 != foo OR s1 != bar OR s1 != baz
+ prune_with_expr(
+ col("s1")
+ .not_eq(lit("foo"))
+ .or(col("s1").not_eq(lit("bar")))
+ .or(col("s1").not_eq(lit("baz"))),
+ &schema,
+ &statistics,
+ // cant' rule out anything based on contains information
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+ }
+
+ #[test]
+ fn prune_with_contained_two_columns() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("s1", DataType::Utf8, true),
+ Field::new("s2", DataType::Utf8, true),
+ ]));
+
+ // Model having information like bloom filters for s1 and s2
+ let statistics = TestStatistics::new()
+ .with_contained(
+ "s1",
+ [ScalarValue::from("foo")],
+ [
+ // container 0, s1 known to only contain "foo"",
+ Some(true),
+ // container 1, s1 known to not contain "foo"
+ Some(false),
+ // container 2, s1 unknown about "foo"
+ None,
+ // container 3, s1 known to only contain "foo"
+ Some(true),
+ // container 4, s1 known to not contain "foo"
+ Some(false),
+ // container 5, s1 unknown about "foo"
+ None,
+ // container 6, s1 known to only contain "foo"
+ Some(true),
+ // container 7, s1 known to not contain "foo"
+ Some(false),
+ // container 8, s1 unknown about "foo"
+ None,
+ ],
+ )
+ .with_contained(
+ "s2", // for column s2
+ [ScalarValue::from("bar")],
+ [
+ // containers 0,1,2 s2 known to only contain "bar"
+ Some(true),
+ Some(true),
+ Some(true),
+ // container 3,4,5 s2 known to not contain "bar"
+ Some(false),
+ Some(false),
+ Some(false),
+ // container 6,7,8 s2 unknown about "bar"
+ None,
+ None,
+ None,
+ ],
+ );
+
+ // s1 = 'foo'
+ prune_with_expr(
+ col("s1").eq(lit("foo")),
+ &schema,
+ &statistics,
+ // rule out containers where we know s1 is not present
+ vec![true, false, true, true, false, true, true, false, true],
+ );
+
+ // s1 = 'foo' OR s2 = 'bar'
+ let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
+ prune_with_expr(
+ expr,
+ &schema,
+ &statistics,
+ // can't rule out any container (would need to prove that s1 !=
foo AND s2 != bar)
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 = 'foo' AND s2 != 'bar'
+ prune_with_expr(
+ col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // can only rule out container where we know either:
+ // 1. s1 doesn't have the value 'foo` or
+ // 2. s2 has only the value of 'bar'
+ vec![false, false, false, true, false, true, true, false, true],
+ );
+
+ // s1 != 'foo' AND s2 != 'bar'
+ prune_with_expr(
+ col("s1")
+ .not_eq(lit("foo"))
+ .and(col("s2").not_eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // Can rule out any container where we know either
+ // 1. s1 has only the value 'foo'
+ // 2. s2 has only the value 'bar'
+ vec![false, false, false, false, true, true, false, true, true],
+ );
+
+ // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
+ prune_with_expr(
+ col("s1")
+ .not_eq(lit("foo"))
+ .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
+ &schema,
+ &statistics,
+ // Can rule out any container where we know s1 has only the value
+ // 'foo'. Can't use knowledge of s2 and bar to rule out anything
+ vec![false, true, true, false, true, true, false, true, true],
+ );
+
+ // s1 like '%foo%bar%'
+ prune_with_expr(
+ col("s1").like(lit("foo%bar%")),
+ &schema,
+ &statistics,
+ // cant rule out anything with information we know
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+
+ // s1 like '%foo%bar%' AND s2 = 'bar'
+ prune_with_expr(
+ col("s1")
+ .like(lit("foo%bar%"))
+ .and(col("s2").eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // can rule out any container where we know s2 does not have the
value 'bar'
+ vec![true, true, true, false, false, false, true, true, true],
+ );
+
+ // s1 like '%foo%bar%' OR s2 = 'bar'
+ prune_with_expr(
+ col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
+ &schema,
+ &statistics,
+ // can't rule out anything (we would have to prove that both the
+ // like and the equality must be false)
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+ }
+
+ #[test]
+ fn prune_with_range_and_contained() {
+ // Setup mimics range information for i, a bloom filter for s
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("i", DataType::Int32, true),
+ Field::new("s", DataType::Utf8, true),
+ ]));
+
+ let statistics = TestStatistics::new()
+ .with(
+ "i",
+ ContainerStats::new_i32(
+ // Container 0, 3, 6: [-5 to 5]
+ // Container 1, 4, 7: [10 to 20]
+ // Container 2, 5, 9: unknown
+ vec![
+ Some(-5),
+ Some(10),
+ None,
+ Some(-5),
+ Some(10),
+ None,
+ Some(-5),
+ Some(10),
+ None,
+ ], // min
+ vec![
+ Some(5),
+ Some(20),
+ None,
+ Some(5),
+ Some(20),
+ None,
+ Some(5),
+ Some(20),
+ None,
+ ], // max
+ ),
+ )
+ // Add contained information about the s and "foo"
+ .with_contained(
+ "s",
+ [ScalarValue::from("foo")],
+ [
+ // container 0,1,2 known to only contain "foo"
+ Some(true),
+ Some(true),
+ Some(true),
+ // container 3,4,5 known to not contain "foo"
+ Some(false),
+ Some(false),
+ Some(false),
+ // container 6,7,8 unknown about "foo"
+ None,
+ None,
+ None,
+ ],
+ );
+
+ // i = 0 and s = 'foo'
+ prune_with_expr(
+ col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
+ &schema,
+ &statistics,
+ // Can rule out container where we know that either:
+ // 1. 0 is outside the min/max range of i
+ // 1. s does not contain foo
+ // (range is false, and contained is false)
+ vec![true, false, true, false, false, false, true, false, true],
+ );
+
+ // i = 0 and s != 'foo'
+ prune_with_expr(
+ col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
+ &schema,
+ &statistics,
+ // Can rule out containers where either:
+ // 1. 0 is outside the min/max range of i
+ // 2. s only contains foo
+ vec![false, false, false, true, false, true, true, false, true],
+ );
+
+ // i = 0 OR s = 'foo'
+ prune_with_expr(
+ col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
+ &schema,
+ &statistics,
+ // in theory could rule out containers if we had min/max values for
+ // s as well. But in this case we don't so we can't rule out
anything
+ vec![true, true, true, true, true, true, true, true, true],
+ );
+ }
+
+ /// prunes the specified expr with the specified schema and statistics, and
+ /// ensures it returns expected.
+ ///
+ /// `expected` is a vector of bools, where true means the row group should
+ /// be kept, and false means it should be pruned.
+ ///
+ // TODO refactor other tests to use this to reduce boiler plate
+ fn prune_with_expr(
+ expr: Expr,
+ schema: &SchemaRef,
+ statistics: &TestStatistics,
+ expected: Vec<bool>,
+ ) {
+ println!("Pruning with expr: {}", expr);
+ let expr = logical2physical(&expr, schema);
+ let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
+ let result = p.prune(statistics).unwrap();
+ assert_eq!(result, expected);
+ }
+
fn test_build_predicate_expression(
expr: &Expr,
schema: &Schema,
- required_columns: &mut RequiredStatColumns,
+ required_columns: &mut RequiredColumns,
) -> Arc<dyn PhysicalExpr> {
let expr = logical2physical(expr, schema);
build_predicate_expression(&expr, schema, required_columns)