This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8524d58e30 Implement `contained` API in PruningPredicate (#8440)
8524d58e30 is described below

commit 8524d58e303b65597eeebc41c75025a6f0822793
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Dec 23 07:10:56 2023 -0500

    Implement `contained` API in PruningPredicate (#8440)
    
    * Implement `contains` API in PruningPredicate
    
    * Apply suggestions from code review
    
    Co-authored-by: Nga Tran <[email protected]>
    
    * Add comment to len(), fix fmt
    
    * rename BoolVecBuilder::append* to BoolVecBuilder::combine*
    
    ---------
    
    Co-authored-by: Nga Tran <[email protected]>
---
 .../physical_plan/parquet/page_filter.rs           |   11 +-
 .../datasource/physical_plan/parquet/row_groups.rs |    9 +
 datafusion/core/src/physical_optimizer/pruning.rs  | 1073 +++++++++++++++-----
 3 files changed, 857 insertions(+), 236 deletions(-)

diff --git 
a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index 42bfef3599..f6310c49bc 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -23,7 +23,7 @@ use arrow::array::{
 };
 use arrow::datatypes::DataType;
 use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
 use log::{debug, trace};
@@ -37,6 +37,7 @@ use parquet::{
     },
     format::PageLocation,
 };
+use std::collections::HashSet;
 use std::sync::Arc;
 
 use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
@@ -554,4 +555,12 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
             ))),
         }
     }
+
+    fn contained(
+        &self,
+        _column: &datafusion_common::Column,
+        _values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        None
+    }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 7c3f7d9384..09e4907c94 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use arrow::{array::ArrayRef, datatypes::Schema};
+use arrow_array::BooleanArray;
 use arrow_schema::FieldRef;
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
@@ -340,6 +341,14 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
         let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count()));
         scalar.to_array().ok()
     }
+
+    fn contained(
+        &self,
+        _column: &Column,
+        _values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        None
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index b2ba7596db..79e084d7b7 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -35,12 +35,13 @@ use arrow::{
     datatypes::{DataType, Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
-use datafusion_common::{downcast_value, plan_datafusion_err, ScalarValue};
+use arrow_array::cast::AsArray;
 use datafusion_common::{
     internal_err, plan_err,
     tree_node::{Transformed, TreeNode},
 };
-use datafusion_physical_expr::utils::collect_columns;
+use datafusion_common::{plan_datafusion_err, ScalarValue};
+use datafusion_physical_expr::utils::{collect_columns, Guarantee, 
LiteralGuarantee};
 use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
 use log::trace;
 
@@ -93,6 +94,30 @@ pub trait PruningStatistics {
     ///
     /// Note: the returned array must contain [`Self::num_containers`] rows
     fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
+
+    /// Returns an array where each row represents information known about
+    /// the `values` contained in a column.
+    ///
+    /// This API is designed to be used along with [`LiteralGuarantee`] to 
prove
+    /// that predicates can not possibly evaluate to `true` and thus prune
+    /// containers. For example, Parquet Bloom Filters can prove that values 
are
+    /// not present.
+    ///
+    /// The returned array has one row for each container, with the following
+    /// meanings:
+    /// * `true` if the values in `column`  ONLY contain values from `values`
+    /// * `false` if the values in `column` are NOT ANY of `values`
+    /// * `null` if the neither of the above holds or is unknown.
+    ///
+    /// If these statistics can not determine column membership for any
+    /// container, return `None` (the default).
+    ///
+    /// Note: the returned array must contain [`Self::num_containers`] rows
+    fn contained(
+        &self,
+        column: &Column,
+        values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray>;
 }
 
 /// Evaluates filter expressions on statistics such as min/max values and null
@@ -142,12 +167,17 @@ pub trait PruningStatistics {
 pub struct PruningPredicate {
     /// The input schema against which the predicate will be evaluated
     schema: SchemaRef,
-    /// Actual pruning predicate (rewritten in terms of column min/max 
statistics)
+    /// A min/max pruning predicate (rewritten in terms of column min/max
+    /// values, which are supplied by statistics)
     predicate_expr: Arc<dyn PhysicalExpr>,
-    /// The statistics required to evaluate this predicate
-    required_columns: RequiredStatColumns,
-    /// Original physical predicate from which this predicate expr is derived 
(required for serialization)
+    /// Description of which statistics are required to evaluate 
`predicate_expr`
+    required_columns: RequiredColumns,
+    /// Original physical predicate from which this predicate expr is derived
+    /// (required for serialization)
     orig_expr: Arc<dyn PhysicalExpr>,
+    /// [`LiteralGuarantee`]s that are used to try and prove a predicate can 
not
+    /// possibly evaluate to `true`.
+    literal_guarantees: Vec<LiteralGuarantee>,
 }
 
 impl PruningPredicate {
@@ -172,14 +202,18 @@ impl PruningPredicate {
     /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
     pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> 
Result<Self> {
         // build predicate expression once
-        let mut required_columns = RequiredStatColumns::new();
+        let mut required_columns = RequiredColumns::new();
         let predicate_expr =
             build_predicate_expression(&expr, schema.as_ref(), &mut 
required_columns);
+
+        let literal_guarantees = LiteralGuarantee::analyze(&expr);
+
         Ok(Self {
             schema,
             predicate_expr,
             required_columns,
             orig_expr: expr,
+            literal_guarantees,
         })
     }
 
@@ -198,40 +232,47 @@ impl PruningPredicate {
     ///
     /// [`ExprSimplifier`]: 
crate::optimizer::simplify_expressions::ExprSimplifier
     pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> 
Result<Vec<bool>> {
+        let mut builder = BoolVecBuilder::new(statistics.num_containers());
+
+        // Try to prove the predicate can't be true for the containers based on
+        // literal guarantees
+        for literal_guarantee in &self.literal_guarantees {
+            let LiteralGuarantee {
+                column,
+                guarantee,
+                literals,
+            } = literal_guarantee;
+            if let Some(results) = statistics.contained(column, literals) {
+                match guarantee {
+                    // `In` means the values in the column must be one of the
+                    // values in the set for the predicate to evaluate to true.
+                    // If `contained` returns false, that means the column is
+                    // not any of the values so we can prune the container
+                    Guarantee::In => builder.combine_array(&results),
+                    // `NotIn` means the values in the column must must not be
+                    // any of the values in the set for the predicate to
+                    // evaluate to true. If contained returns true, it means 
the
+                    // column is only in the set of values so we can prune the
+                    // container
+                    Guarantee::NotIn => {
+                        builder.combine_array(&arrow::compute::not(&results)?)
+                    }
+                }
+            }
+        }
+
+        // Next, try to prove the predicate can't be true for the containers 
based
+        // on min/max values
+
         // build a RecordBatch that contains the min/max values in the
-        // appropriate statistics columns
+        // appropriate statistics columns for the min/max predicate
         let statistics_batch =
             build_statistics_record_batch(statistics, &self.required_columns)?;
 
-        // Evaluate the pruning predicate on that record batch.
-        //
-        // Use true when the result of evaluating a predicate
-        // expression on a row group is null (aka `None`). Null can
-        // arise when the statistics are unknown or some calculation
-        // in the predicate means we don't know for sure if the row
-        // group can be filtered out or not. To maintain correctness
-        // the row group must be kept and thus `true` is returned.
-        match self.predicate_expr.evaluate(&statistics_batch)? {
-            ColumnarValue::Array(array) => {
-                let predicate_array = downcast_value!(array, BooleanArray);
+        // Evaluate the pruning predicate on that record batch and append any 
results to the builder
+        
builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
 
-                Ok(predicate_array
-                    .into_iter()
-                    .map(|x| x.unwrap_or(true)) // None -> true per comments 
above
-                    .collect::<Vec<_>>())
-            }
-            // result was a column
-            ColumnarValue::Scalar(ScalarValue::Boolean(v)) => {
-                let v = v.unwrap_or(true); // None -> true per comments above
-                Ok(vec![v; statistics.num_containers()])
-            }
-            other => {
-                internal_err!(
-                    "Unexpected result of pruning predicate evaluation. 
Expected Boolean array \
-                     or scalar but got {other:?}"
-                )
-            }
-        }
+        Ok(builder.build())
     }
 
     /// Return a reference to the input schema
@@ -254,9 +295,91 @@ impl PruningPredicate {
         is_always_true(&self.predicate_expr)
     }
 
-    pub(crate) fn required_columns(&self) -> &RequiredStatColumns {
+    pub(crate) fn required_columns(&self) -> &RequiredColumns {
         &self.required_columns
     }
+
+    /// Names of the columns that are known to be / not be in a set
+    /// of literals (constants). These are the columns the that may be passed 
to
+    /// [`PruningStatistics::contained`] during pruning.
+    ///
+    /// This is useful to avoid fetching statistics for columns that will not 
be
+    /// used in the predicate. For example, it can be used to avoid reading
+    /// uneeded bloom filters (a non trivial operation).
+    pub fn literal_columns(&self) -> Vec<String> {
+        let mut seen = HashSet::new();
+        self.literal_guarantees
+            .iter()
+            .map(|e| &e.column.name)
+            // avoid duplicates
+            .filter(|name| seen.insert(*name))
+            .map(|s| s.to_string())
+            .collect()
+    }
+}
+
+/// Builds the return `Vec` for [`PruningPredicate::prune`].
+#[derive(Debug)]
+struct BoolVecBuilder {
+    /// One element per container. Each element is
+    /// * `true`: if the container has row that may pass the predicate
+    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the 
predicate
+    inner: Vec<bool>,
+}
+
+impl BoolVecBuilder {
+    /// Create a new `BoolVecBuilder` with `num_containers` elements
+    fn new(num_containers: usize) -> Self {
+        Self {
+            // assume by default all containers may pass the predicate
+            inner: vec![true; num_containers],
+        }
+    }
+
+    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
+    /// predicate into the currently in progress array.
+    ///
+    /// Each `array` element is:
+    /// * `true`: container has row that may pass the predicate
+    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
+    /// * `null`: container may or may not have rows that pass the predicate
+    fn combine_array(&mut self, array: &BooleanArray) {
+        assert_eq!(array.len(), self.inner.len());
+        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
+            // `false` for this conjunct means we know for sure no rows could
+            // pass the predicate and thus we set the corresponding container
+            // location to false.
+            if let Some(false) = new {
+                *cur = false;
+            }
+        }
+    }
+
+    /// Combines the results in the [`ColumnarValue`] to the currently in
+    /// progress array, following the same rules as [`Self::combine_array`].
+    ///
+    /// # Panics
+    /// If `value` is not boolean
+    fn combine_value(&mut self, value: ColumnarValue) {
+        match value {
+            ColumnarValue::Array(array) => {
+                self.combine_array(array.as_boolean());
+            }
+            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
+                // False means all containers can not pass the predicate
+                self.inner = vec![false; self.inner.len()];
+            }
+            _ => {
+                // Null or true means the rows in container may pass this
+                // conjunct so we can't prune any containers based on that
+            }
+        }
+    }
+
+    /// Convert this builder into a Vec of bools
+    fn build(self) -> Vec<bool> {
+        self.inner
+    }
 }
 
 fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
@@ -276,21 +399,21 @@ fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
 /// Handles creating references to the min/max statistics
 /// for columns as well as recording which statistics are needed
 #[derive(Debug, Default, Clone)]
-pub(crate) struct RequiredStatColumns {
+pub(crate) struct RequiredColumns {
     /// The statistics required to evaluate this predicate:
     /// * The unqualified column in the input schema
     /// * Statistics type (e.g. Min or Max or Null_Count)
     /// * The field the statistics value should be placed in for
-    ///   pruning predicate evaluation
+    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
     columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
 }
 
-impl RequiredStatColumns {
+impl RequiredColumns {
     fn new() -> Self {
         Self::default()
     }
 
-    /// Returns number of unique columns.
+    /// Returns number of unique columns
     pub(crate) fn n_columns(&self) -> usize {
         self.iter()
             .map(|(c, _s, _f)| c)
@@ -344,11 +467,10 @@ impl RequiredStatColumns {
 
         // only add statistics column if not previously added
         if need_to_insert {
-            let stat_field = Field::new(
-                stat_column.name(),
-                field.data_type().clone(),
-                field.is_nullable(),
-            );
+            // may be null if statistics are not present
+            let nullable = true;
+            let stat_field =
+                Field::new(stat_column.name(), field.data_type().clone(), 
nullable);
             self.columns.push((column.clone(), stat_type, stat_field));
         }
         rewrite_column_expr(column_expr.clone(), column, &stat_column)
@@ -391,7 +513,7 @@ impl RequiredStatColumns {
     }
 }
 
-impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for 
RequiredStatColumns {
+impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns 
{
     fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
         Self { columns }
     }
@@ -424,7 +546,7 @@ impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> 
for RequiredStatColum
 /// ```
 fn build_statistics_record_batch<S: PruningStatistics>(
     statistics: &S,
-    required_columns: &RequiredStatColumns,
+    required_columns: &RequiredColumns,
 ) -> Result<RecordBatch> {
     let mut fields = Vec::<Field>::new();
     let mut arrays = Vec::<ArrayRef>::new();
@@ -480,7 +602,7 @@ struct PruningExpressionBuilder<'a> {
     op: Operator,
     scalar_expr: Arc<dyn PhysicalExpr>,
     field: &'a Field,
-    required_columns: &'a mut RequiredStatColumns,
+    required_columns: &'a mut RequiredColumns,
 }
 
 impl<'a> PruningExpressionBuilder<'a> {
@@ -489,7 +611,7 @@ impl<'a> PruningExpressionBuilder<'a> {
         right: &'a Arc<dyn PhysicalExpr>,
         op: Operator,
         schema: &'a Schema,
-        required_columns: &'a mut RequiredStatColumns,
+        required_columns: &'a mut RequiredColumns,
     ) -> Result<Self> {
         // find column name; input could be a more complicated expression
         let left_columns = collect_columns(left);
@@ -704,7 +826,7 @@ fn reverse_operator(op: Operator) -> Result<Operator> {
 fn build_single_column_expr(
     column: &phys_expr::Column,
     schema: &Schema,
-    required_columns: &mut RequiredStatColumns,
+    required_columns: &mut RequiredColumns,
     is_not: bool, // if true, treat as !col
 ) -> Option<Arc<dyn PhysicalExpr>> {
     let field = schema.field_with_name(column.name()).ok()?;
@@ -745,7 +867,7 @@ fn build_single_column_expr(
 fn build_is_null_column_expr(
     expr: &Arc<dyn PhysicalExpr>,
     schema: &Schema,
-    required_columns: &mut RequiredStatColumns,
+    required_columns: &mut RequiredColumns,
 ) -> Option<Arc<dyn PhysicalExpr>> {
     if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
         let field = schema.field_with_name(col.name()).ok()?;
@@ -775,7 +897,7 @@ fn build_is_null_column_expr(
 fn build_predicate_expression(
     expr: &Arc<dyn PhysicalExpr>,
     schema: &Schema,
-    required_columns: &mut RequiredStatColumns,
+    required_columns: &mut RequiredColumns,
 ) -> Arc<dyn PhysicalExpr> {
     // Returned for unsupported expressions. Such expressions are
     // converted to TRUE.
@@ -984,7 +1106,7 @@ mod tests {
     use std::collections::HashMap;
     use std::ops::{Not, Rem};
 
-    #[derive(Debug)]
+    #[derive(Debug, Default)]
     /// Mock statistic provider for tests
     ///
     /// Each row represents the statistics for a "container" (which
@@ -993,95 +1115,142 @@ mod tests {
     ///
     /// Note All `ArrayRefs` must be the same size.
     struct ContainerStats {
-        min: ArrayRef,
-        max: ArrayRef,
+        min: Option<ArrayRef>,
+        max: Option<ArrayRef>,
         /// Optional values
         null_counts: Option<ArrayRef>,
+        /// Optional known values (e.g. mimic a bloom filter)
+        /// (value, contained)
+        /// If present, all BooleanArrays must be the same size as min/max
+        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
     }
 
     impl ContainerStats {
+        fn new() -> Self {
+            Default::default()
+        }
         fn new_decimal128(
             min: impl IntoIterator<Item = Option<i128>>,
             max: impl IntoIterator<Item = Option<i128>>,
             precision: u8,
             scale: i8,
         ) -> Self {
-            Self {
-                min: Arc::new(
+            Self::new()
+                .with_min(Arc::new(
                     min.into_iter()
                         .collect::<Decimal128Array>()
                         .with_precision_and_scale(precision, scale)
                         .unwrap(),
-                ),
-                max: Arc::new(
+                ))
+                .with_max(Arc::new(
                     max.into_iter()
                         .collect::<Decimal128Array>()
                         .with_precision_and_scale(precision, scale)
                         .unwrap(),
-                ),
-                null_counts: None,
-            }
+                ))
         }
 
         fn new_i64(
             min: impl IntoIterator<Item = Option<i64>>,
             max: impl IntoIterator<Item = Option<i64>>,
         ) -> Self {
-            Self {
-                min: Arc::new(min.into_iter().collect::<Int64Array>()),
-                max: Arc::new(max.into_iter().collect::<Int64Array>()),
-                null_counts: None,
-            }
+            Self::new()
+                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
+                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
         }
 
         fn new_i32(
             min: impl IntoIterator<Item = Option<i32>>,
             max: impl IntoIterator<Item = Option<i32>>,
         ) -> Self {
-            Self {
-                min: Arc::new(min.into_iter().collect::<Int32Array>()),
-                max: Arc::new(max.into_iter().collect::<Int32Array>()),
-                null_counts: None,
-            }
+            Self::new()
+                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
+                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
         }
 
         fn new_utf8<'a>(
             min: impl IntoIterator<Item = Option<&'a str>>,
             max: impl IntoIterator<Item = Option<&'a str>>,
         ) -> Self {
-            Self {
-                min: Arc::new(min.into_iter().collect::<StringArray>()),
-                max: Arc::new(max.into_iter().collect::<StringArray>()),
-                null_counts: None,
-            }
+            Self::new()
+                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
+                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
         }
 
         fn new_bool(
             min: impl IntoIterator<Item = Option<bool>>,
             max: impl IntoIterator<Item = Option<bool>>,
         ) -> Self {
-            Self {
-                min: Arc::new(min.into_iter().collect::<BooleanArray>()),
-                max: Arc::new(max.into_iter().collect::<BooleanArray>()),
-                null_counts: None,
-            }
+            Self::new()
+                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
+                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
         }
 
         fn min(&self) -> Option<ArrayRef> {
-            Some(self.min.clone())
+            self.min.clone()
         }
 
         fn max(&self) -> Option<ArrayRef> {
-            Some(self.max.clone())
+            self.max.clone()
         }
 
         fn null_counts(&self) -> Option<ArrayRef> {
             self.null_counts.clone()
         }
 
+        /// return an iterator over all arrays in this statistics
+        fn arrays(&self) -> Vec<ArrayRef> {
+            let contained_arrays = self
+                .contained
+                .iter()
+                .map(|(_values, contained)| Arc::new(contained.clone()) as 
ArrayRef);
+
+            [
+                self.min.as_ref().cloned(),
+                self.max.as_ref().cloned(),
+                self.null_counts.as_ref().cloned(),
+            ]
+            .into_iter()
+            .flatten()
+            .chain(contained_arrays)
+            .collect()
+        }
+
+        /// Returns the number of containers represented by this statistics 
This
+        /// picks the length of the first array as all arrays must have the 
same
+        /// length (which is verified by `assert_invariants`).
         fn len(&self) -> usize {
-            assert_eq!(self.min.len(), self.max.len());
-            self.min.len()
+            // pick the first non zero length
+            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
+        }
+
+        /// Ensure that the lengths of all arrays are consistent
+        fn assert_invariants(&self) {
+            let mut prev_len = None;
+
+            for len in self.arrays().iter().map(|a| a.len()) {
+                // Get a length, if we don't already have one
+                match prev_len {
+                    None => {
+                        prev_len = Some(len);
+                    }
+                    Some(prev_len) => {
+                        assert_eq!(prev_len, len);
+                    }
+                }
+            }
+        }
+
+        /// Add min values
+        fn with_min(mut self, min: ArrayRef) -> Self {
+            self.min = Some(min);
+            self
+        }
+
+        /// Add max values
+        fn with_max(mut self, max: ArrayRef) -> Self {
+            self.max = Some(max);
+            self
         }
 
         /// Add null counts. There must be the same number of null counts as
@@ -1090,14 +1259,36 @@ mod tests {
             mut self,
             counts: impl IntoIterator<Item = Option<i64>>,
         ) -> Self {
-            // take stats out and update them
             let null_counts: ArrayRef =
                 Arc::new(counts.into_iter().collect::<Int64Array>());
 
-            assert_eq!(null_counts.len(), self.len());
+            self.assert_invariants();
             self.null_counts = Some(null_counts);
             self
         }
+
+        /// Add contained information.
+        pub fn with_contained(
+            mut self,
+            values: impl IntoIterator<Item = ScalarValue>,
+            contained: impl IntoIterator<Item = Option<bool>>,
+        ) -> Self {
+            let contained: BooleanArray = contained.into_iter().collect();
+            let values: HashSet<_> = values.into_iter().collect();
+
+            self.contained.push((values, contained));
+            self.assert_invariants();
+            self
+        }
+
+        /// get any contained information for the specified values
+        fn contained(&self, find_values: &HashSet<ScalarValue>) -> 
Option<BooleanArray> {
+            // find the one with the matching values
+            self.contained
+                .iter()
+                .find(|(values, _contained)| values == find_values)
+                .map(|(_values, contained)| contained.clone())
+        }
     }
 
     #[derive(Debug, Default)]
@@ -1135,13 +1326,34 @@ mod tests {
             let container_stats = self
                 .stats
                 .remove(&col)
-                .expect("Can not find stats for column")
+                .unwrap_or_default()
                 .with_null_counts(counts);
 
             // put stats back in
             self.stats.insert(col, container_stats);
             self
         }
+
+        /// Add contained information for the specified columm.
+        fn with_contained(
+            mut self,
+            name: impl Into<String>,
+            values: impl IntoIterator<Item = ScalarValue>,
+            contained: impl IntoIterator<Item = Option<bool>>,
+        ) -> Self {
+            let col = Column::from_name(name.into());
+
+            // take stats out and update them
+            let container_stats = self
+                .stats
+                .remove(&col)
+                .unwrap_or_default()
+                .with_contained(values, contained);
+
+            // put stats back in
+            self.stats.insert(col, container_stats);
+            self
+        }
     }
 
     impl PruningStatistics for TestStatistics {
@@ -1173,6 +1385,16 @@ mod tests {
                 .map(|container_stats| container_stats.null_counts())
                 .unwrap_or(None)
         }
+
+        fn contained(
+            &self,
+            column: &Column,
+            values: &HashSet<ScalarValue>,
+        ) -> Option<BooleanArray> {
+            self.stats
+                .get(column)
+                .and_then(|container_stats| container_stats.contained(values))
+        }
     }
 
     /// Returns the specified min/max container values
@@ -1198,12 +1420,20 @@ mod tests {
         fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
             None
         }
+
+        fn contained(
+            &self,
+            _column: &Column,
+            _values: &HashSet<ScalarValue>,
+        ) -> Option<BooleanArray> {
+            None
+        }
     }
 
     #[test]
     fn test_build_statistics_record_batch() {
         // Request a record batch with of s1_min, s2_max, s3_max, s3_min
-        let required_columns = RequiredStatColumns::from(vec![
+        let required_columns = RequiredColumns::from(vec![
             // min of original column s1, named s1_min
             (
                 phys_expr::Column::new("s1", 1),
@@ -1275,7 +1505,7 @@ mod tests {
         // which is what Parquet does
 
         // Request a record batch with of s1_min as a timestamp
-        let required_columns = RequiredStatColumns::from(vec![(
+        let required_columns = RequiredColumns::from(vec![(
             phys_expr::Column::new("s3", 3),
             StatisticsType::Min,
             Field::new(
@@ -1307,7 +1537,7 @@ mod tests {
 
     #[test]
     fn test_build_statistics_no_required_stats() {
-        let required_columns = RequiredStatColumns::new();
+        let required_columns = RequiredColumns::new();
 
         let statistics = OneContainerStats {
             min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
@@ -1325,7 +1555,7 @@ mod tests {
         // Test requesting a Utf8 column when the stats return some other type
 
         // Request a record batch with of s1_min as a timestamp
-        let required_columns = RequiredStatColumns::from(vec![(
+        let required_columns = RequiredColumns::from(vec![(
             phys_expr::Column::new("s3", 3),
             StatisticsType::Min,
             Field::new("s1_min", DataType::Utf8, true),
@@ -1354,7 +1584,7 @@ mod tests {
     #[test]
     fn test_build_statistics_inconsistent_length() {
         // return an inconsistent length to the actual statistics arrays
-        let required_columns = RequiredStatColumns::from(vec![(
+        let required_columns = RequiredColumns::from(vec![(
             phys_expr::Column::new("s1", 3),
             StatisticsType::Min,
             Field::new("s1_min", DataType::Int64, true),
@@ -1385,20 +1615,14 @@ mod tests {
 
         // test column on the left
         let expr = col("c1").eq(lit(1));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         // test column on the right
         let expr = lit(1).eq(col("c1"));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1411,20 +1635,14 @@ mod tests {
 
         // test column on the left
         let expr = col("c1").not_eq(lit(1));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         // test column on the right
         let expr = lit(1).not_eq(col("c1"));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1437,20 +1655,14 @@ mod tests {
 
         // test column on the left
         let expr = col("c1").gt(lit(1));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         // test column on the right
         let expr = lit(1).lt(col("c1"));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1463,19 +1675,13 @@ mod tests {
 
         // test column on the left
         let expr = col("c1").gt_eq(lit(1));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
         // test column on the right
         let expr = lit(1).lt_eq(col("c1"));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1488,20 +1694,14 @@ mod tests {
 
         // test column on the left
         let expr = col("c1").lt(lit(1));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         // test column on the right
         let expr = lit(1).gt(col("c1"));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1514,19 +1714,13 @@ mod tests {
 
         // test column on the left
         let expr = col("c1").lt_eq(lit(1));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
         // test column on the right
         let expr = lit(1).gt_eq(col("c1"));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1542,11 +1736,8 @@ mod tests {
         // test AND operator joining supported c1 < 1 expression and 
unsupported c2 > c3 expression
         let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
         let expected_expr = "c1_min@0 < 1";
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1561,11 +1752,8 @@ mod tests {
         // test OR operator joining supported c1 < 1 expression and 
unsupported c2 % 2 = 0 expression
         let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
         let expected_expr = "true";
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1577,11 +1765,8 @@ mod tests {
         let expected_expr = "true";
 
         let expr = col("c1").not();
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1593,11 +1778,8 @@ mod tests {
         let expected_expr = "NOT c1_min@0 AND c1_max@1";
 
         let expr = col("c1").not();
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1609,11 +1791,8 @@ mod tests {
         let expected_expr = "c1_min@0 OR c1_max@1";
 
         let expr = col("c1");
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1627,11 +1806,8 @@ mod tests {
         // DF doesn't support arithmetic on boolean columns so
         // this predicate will error when evaluated
         let expr = col("c1").lt(lit(true));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1643,7 +1819,7 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Int32, false),
         ]);
-        let mut required_columns = RequiredStatColumns::new();
+        let mut required_columns = RequiredColumns::new();
         // c1 < 1 and (c2 = 2 or c2 = 3)
         let expr = col("c1")
             .lt(lit(1))
@@ -1659,7 +1835,7 @@ mod tests {
             (
                 phys_expr::Column::new("c1", 0),
                 StatisticsType::Min,
-                c1_min_field
+                c1_min_field.with_nullable(true) // could be nullable if stats 
are not present
             )
         );
         // c2 = 2 should add c2_min and c2_max
@@ -1669,7 +1845,7 @@ mod tests {
             (
                 phys_expr::Column::new("c2", 1),
                 StatisticsType::Min,
-                c2_min_field
+                c2_min_field.with_nullable(true) // could be nullable if stats 
are not present
             )
         );
         let c2_max_field = Field::new("c2_max", DataType::Int32, false);
@@ -1678,7 +1854,7 @@ mod tests {
             (
                 phys_expr::Column::new("c2", 1),
                 StatisticsType::Max,
-                c2_max_field
+                c2_max_field.with_nullable(true) // could be nullable if stats 
are not present
             )
         );
         // c2 = 3 shouldn't add any new statistics fields
@@ -1700,11 +1876,8 @@ mod tests {
             false,
         ));
         let expected_expr = "c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_min@0 <= 2 
AND 2 <= c1_max@1 OR c1_min@0 <= 3 AND 3 <= c1_max@1";
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1719,11 +1892,8 @@ mod tests {
         // test c1 in()
         let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], 
false));
         let expected_expr = "true";
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1744,11 +1914,8 @@ mod tests {
         let expected_expr = "(c1_min@0 != 1 OR 1 != c1_max@1) \
         AND (c1_min@0 != 2 OR 2 != c1_max@1) \
         AND (c1_min@0 != 3 OR 3 != c1_max@1)";
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1762,20 +1929,14 @@ mod tests {
 
         // test column on the left
         let expr = cast(col("c1"), 
DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         // test column on the right
         let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), 
DataType::Int64));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         let expected_expr = "TRY_CAST(c1_max@0 AS Int64) > 1";
@@ -1783,21 +1944,15 @@ mod tests {
         // test column on the left
         let expr =
             try_cast(col("c1"), 
DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         // test column on the right
         let expr =
             lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), 
DataType::Int64));
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -1817,11 +1972,8 @@ mod tests {
             false,
         ));
         let expected_expr = "CAST(c1_min@0 AS Int64) <= 1 AND 1 <= 
CAST(c1_max@1 AS Int64) OR CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 
AS Int64) OR CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         let expr = Expr::InList(InList::new(
@@ -1837,11 +1989,8 @@ mod tests {
             "(CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) \
         AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) \
         AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
-        let predicate_expr = test_build_predicate_expression(
-            &expr,
-            &schema,
-            &mut RequiredStatColumns::new(),
-        );
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut 
RequiredColumns::new());
         assert_eq!(predicate_expr.to_string(), expected_expr);
 
         Ok(())
@@ -2484,10 +2633,464 @@ mod tests {
         // TODO: add other negative test for other case and op
     }
 
+    #[test]
+    fn prune_with_contained_one_column() {
+        let schema = Arc::new(Schema::new(vec![Field::new("s1", 
DataType::Utf8, true)]));
+
+        // Model having information like a bloom filter for s1
+        let statistics = TestStatistics::new()
+            .with_contained(
+                "s1",
+                [ScalarValue::from("foo")],
+                [
+                    // container 0 known to only contain "foo"",
+                    Some(true),
+                    // container 1 known to not contain "foo"
+                    Some(false),
+                    // container 2 unknown about "foo"
+                    None,
+                    // container 3 known to only contain "foo"
+                    Some(true),
+                    // container 4 known to not contain "foo"
+                    Some(false),
+                    // container 5 unknown about "foo"
+                    None,
+                    // container 6 known to only contain "foo"
+                    Some(true),
+                    // container 7 known to not contain "foo"
+                    Some(false),
+                    // container 8 unknown about "foo"
+                    None,
+                ],
+            )
+            .with_contained(
+                "s1",
+                [ScalarValue::from("bar")],
+                [
+                    // containers 0,1,2 known to only contain "bar"
+                    Some(true),
+                    Some(true),
+                    Some(true),
+                    // container 3,4,5 known to not contain "bar"
+                    Some(false),
+                    Some(false),
+                    Some(false),
+                    // container 6,7,8 unknown about "bar"
+                    None,
+                    None,
+                    None,
+                ],
+            )
+            .with_contained(
+                // the way the tests are setup, this data is
+                // consulted if the "foo" and "bar" are being checked at the 
same time
+                "s1",
+                [ScalarValue::from("foo"), ScalarValue::from("bar")],
+                [
+                    // container 0,1,2 unknown about ("foo, "bar")
+                    None,
+                    None,
+                    None,
+                    // container 3,4,5 known to contain only either "foo" and 
"bar"
+                    Some(true),
+                    Some(true),
+                    Some(true),
+                    // container 6,7,8  known to contain  neither "foo" and 
"bar"
+                    Some(false),
+                    Some(false),
+                    Some(false),
+                ],
+            );
+
+        // s1 = 'foo'
+        prune_with_expr(
+            col("s1").eq(lit("foo")),
+            &schema,
+            &statistics,
+            // rule out containers ('false) where we know foo is not present
+            vec![true, false, true, true, false, true, true, false, true],
+        );
+
+        // s1 = 'bar'
+        prune_with_expr(
+            col("s1").eq(lit("bar")),
+            &schema,
+            &statistics,
+            // rule out containers where we know bar is not present
+            vec![true, true, true, false, false, false, true, true, true],
+        );
+
+        // s1 = 'baz' (unknown value)
+        prune_with_expr(
+            col("s1").eq(lit("baz")),
+            &schema,
+            &statistics,
+            // can't rule out anything
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 = 'foo' AND s1 = 'bar'
+        prune_with_expr(
+            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // logically this predicate can't possibly be true (the column 
can't
+            // take on both values) but we could rule it out if the stats tell
+            // us that both values are not present
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 = 'foo' OR s1 = 'bar'
+        prune_with_expr(
+            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // can rule out containers that we know contain neither foo nor bar
+            vec![true, true, true, true, true, true, false, false, false],
+        );
+
+        // s1 = 'foo' OR s1 = 'baz'
+        prune_with_expr(
+            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
+            &schema,
+            &statistics,
+            // can't rule out anything container
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
+        prune_with_expr(
+            col("s1")
+                .eq(lit("foo"))
+                .or(col("s1").eq(lit("bar")))
+                .or(col("s1").eq(lit("baz"))),
+            &schema,
+            &statistics,
+            // can rule out any containers based on knowledge of s1 and `foo`,
+            // `bar` and (`foo`, `bar`)
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 != foo
+        prune_with_expr(
+            col("s1").not_eq(lit("foo")),
+            &schema,
+            &statistics,
+            // rule out containers we know for sure only contain foo
+            vec![false, true, true, false, true, true, false, true, true],
+        );
+
+        // s1 != bar
+        prune_with_expr(
+            col("s1").not_eq(lit("bar")),
+            &schema,
+            &statistics,
+            // rule out when we know for sure s1 has the value bar
+            vec![false, false, false, true, true, true, true, true, true],
+        );
+
+        // s1 != foo AND s1 != bar
+        prune_with_expr(
+            col("s1")
+                .not_eq(lit("foo"))
+                .and(col("s1").not_eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // can rule out any container where we know s1 does not have 
either 'foo' or 'bar'
+            vec![true, true, true, false, false, false, true, true, true],
+        );
+
+        // s1 != foo AND s1 != bar AND s1 != baz
+        prune_with_expr(
+            col("s1")
+                .not_eq(lit("foo"))
+                .and(col("s1").not_eq(lit("bar")))
+                .and(col("s1").not_eq(lit("baz"))),
+            &schema,
+            &statistics,
+            // can't rule out any container based on  knowledge of s1,s2
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 != foo OR s1 != bar
+        prune_with_expr(
+            col("s1")
+                .not_eq(lit("foo"))
+                .or(col("s1").not_eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // cant' rule out anything based on contains information
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 != foo OR s1 != bar OR s1 != baz
+        prune_with_expr(
+            col("s1")
+                .not_eq(lit("foo"))
+                .or(col("s1").not_eq(lit("bar")))
+                .or(col("s1").not_eq(lit("baz"))),
+            &schema,
+            &statistics,
+            // cant' rule out anything based on contains information
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+    }
+
+    #[test]
+    fn prune_with_contained_two_columns() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("s1", DataType::Utf8, true),
+            Field::new("s2", DataType::Utf8, true),
+        ]));
+
+        // Model having information like bloom filters for s1 and s2
+        let statistics = TestStatistics::new()
+            .with_contained(
+                "s1",
+                [ScalarValue::from("foo")],
+                [
+                    // container 0, s1 known to only contain "foo"",
+                    Some(true),
+                    // container 1, s1 known to not contain "foo"
+                    Some(false),
+                    // container 2, s1 unknown about "foo"
+                    None,
+                    // container 3, s1 known to only contain "foo"
+                    Some(true),
+                    // container 4, s1 known to not contain "foo"
+                    Some(false),
+                    // container 5, s1 unknown about "foo"
+                    None,
+                    // container 6, s1 known to only contain "foo"
+                    Some(true),
+                    // container 7, s1 known to not contain "foo"
+                    Some(false),
+                    // container 8, s1 unknown about "foo"
+                    None,
+                ],
+            )
+            .with_contained(
+                "s2", // for column s2
+                [ScalarValue::from("bar")],
+                [
+                    // containers 0,1,2 s2 known to only contain "bar"
+                    Some(true),
+                    Some(true),
+                    Some(true),
+                    // container 3,4,5 s2 known to not contain "bar"
+                    Some(false),
+                    Some(false),
+                    Some(false),
+                    // container 6,7,8 s2 unknown about "bar"
+                    None,
+                    None,
+                    None,
+                ],
+            );
+
+        // s1 = 'foo'
+        prune_with_expr(
+            col("s1").eq(lit("foo")),
+            &schema,
+            &statistics,
+            // rule out containers where we know s1 is not present
+            vec![true, false, true, true, false, true, true, false, true],
+        );
+
+        // s1 = 'foo' OR s2 = 'bar'
+        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
+        prune_with_expr(
+            expr,
+            &schema,
+            &statistics,
+            //  can't rule out any container (would need to prove that s1 != 
foo AND s2 != bar)
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 = 'foo' AND s2 != 'bar'
+        prune_with_expr(
+            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // can only rule out container where we know either:
+            // 1. s1 doesn't have the value 'foo` or
+            // 2. s2 has only the value of 'bar'
+            vec![false, false, false, true, false, true, true, false, true],
+        );
+
+        // s1 != 'foo' AND s2 != 'bar'
+        prune_with_expr(
+            col("s1")
+                .not_eq(lit("foo"))
+                .and(col("s2").not_eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // Can  rule out any container where we know either
+            // 1. s1 has only the value 'foo'
+            // 2. s2 has only the value 'bar'
+            vec![false, false, false, false, true, true, false, true, true],
+        );
+
+        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
+        prune_with_expr(
+            col("s1")
+                .not_eq(lit("foo"))
+                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
+            &schema,
+            &statistics,
+            // Can rule out any container where we know s1 has only the value
+            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
+            vec![false, true, true, false, true, true, false, true, true],
+        );
+
+        // s1 like '%foo%bar%'
+        prune_with_expr(
+            col("s1").like(lit("foo%bar%")),
+            &schema,
+            &statistics,
+            // cant rule out anything with information we know
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+
+        // s1 like '%foo%bar%' AND s2 = 'bar'
+        prune_with_expr(
+            col("s1")
+                .like(lit("foo%bar%"))
+                .and(col("s2").eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // can rule out any container where we know s2 does not have the 
value 'bar'
+            vec![true, true, true, false, false, false, true, true, true],
+        );
+
+        // s1 like '%foo%bar%' OR s2 = 'bar'
+        prune_with_expr(
+            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
+            &schema,
+            &statistics,
+            // can't rule out anything (we would have to prove that both the
+            // like and the equality must be false)
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+    }
+
+    #[test]
+    fn prune_with_range_and_contained() {
+        // Setup mimics range information for i, a bloom filter for s
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("i", DataType::Int32, true),
+            Field::new("s", DataType::Utf8, true),
+        ]));
+
+        let statistics = TestStatistics::new()
+            .with(
+                "i",
+                ContainerStats::new_i32(
+                    // Container 0, 3, 6: [-5 to 5]
+                    // Container 1, 4, 7: [10 to 20]
+                    // Container 2, 5, 9: unknown
+                    vec![
+                        Some(-5),
+                        Some(10),
+                        None,
+                        Some(-5),
+                        Some(10),
+                        None,
+                        Some(-5),
+                        Some(10),
+                        None,
+                    ], // min
+                    vec![
+                        Some(5),
+                        Some(20),
+                        None,
+                        Some(5),
+                        Some(20),
+                        None,
+                        Some(5),
+                        Some(20),
+                        None,
+                    ], // max
+                ),
+            )
+            // Add contained  information about the s and "foo"
+            .with_contained(
+                "s",
+                [ScalarValue::from("foo")],
+                [
+                    // container 0,1,2 known to only contain "foo"
+                    Some(true),
+                    Some(true),
+                    Some(true),
+                    // container 3,4,5 known to not contain "foo"
+                    Some(false),
+                    Some(false),
+                    Some(false),
+                    // container 6,7,8 unknown about "foo"
+                    None,
+                    None,
+                    None,
+                ],
+            );
+
+        // i = 0 and s = 'foo'
+        prune_with_expr(
+            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
+            &schema,
+            &statistics,
+            // Can rule out container where we know that either:
+            // 1. 0 is outside the min/max range of i
+            // 1. s does not contain foo
+            // (range is false, and contained  is false)
+            vec![true, false, true, false, false, false, true, false, true],
+        );
+
+        // i = 0 and s != 'foo'
+        prune_with_expr(
+            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
+            &schema,
+            &statistics,
+            // Can rule out containers where either:
+            // 1. 0 is outside the min/max range of i
+            // 2. s only contains foo
+            vec![false, false, false, true, false, true, true, false, true],
+        );
+
+        // i = 0 OR s = 'foo'
+        prune_with_expr(
+            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
+            &schema,
+            &statistics,
+            // in theory could rule out containers if we had min/max values for
+            // s as well. But in this case we don't so we can't rule out 
anything
+            vec![true, true, true, true, true, true, true, true, true],
+        );
+    }
+
+    /// prunes the specified expr with the specified schema and statistics, and
+    /// ensures it returns expected.
+    ///
+    /// `expected` is a vector of bools, where true means the row group should
+    /// be kept, and false means it should be pruned.
+    ///
+    // TODO refactor other tests to use this to reduce boiler plate
+    fn prune_with_expr(
+        expr: Expr,
+        schema: &SchemaRef,
+        statistics: &TestStatistics,
+        expected: Vec<bool>,
+    ) {
+        println!("Pruning with expr: {}", expr);
+        let expr = logical2physical(&expr, schema);
+        let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
+        let result = p.prune(statistics).unwrap();
+        assert_eq!(result, expected);
+    }
+
     fn test_build_predicate_expression(
         expr: &Expr,
         schema: &Schema,
-        required_columns: &mut RequiredStatColumns,
+        required_columns: &mut RequiredColumns,
     ) -> Arc<dyn PhysicalExpr> {
         let expr = logical2physical(expr, schema);
         build_predicate_expression(&expr, schema, required_columns)

Reply via email to