alamb commented on code in PR #8437:
URL: https://github.com/apache/arrow-datafusion/pull/8437#discussion_r1428361878


##########
datafusion/physical-expr/src/utils/guarantee.rs:
##########
@@ -0,0 +1,699 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`LiteralGuarantee`] predicate analysis to determine if a column is a
+//! constant.
+
+use crate::utils::split_disjunction;
+use crate::{split_conjunction, PhysicalExpr};
+use datafusion_common::{Column, ScalarValue};
+use datafusion_expr::Operator;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+/// Represents a guarantee that must be true for a boolean expression to
+/// evaluate to `true`.
+///
+/// The guarantee takes the form of a column and a set of literal (constant)
+/// [`ScalarValue`]s. For the expression to evaluate to `true`, the column 
*must
+/// satisfy* the guarantee(s).
+///
+/// To satisfy the guarantee, depending on [`Guarantee`],  the values in the
+/// column must either:
+///
+/// 1. be ONLY one of that set
+/// 2. NOT be ANY of that set
+///
+/// # Uses `LiteralGuarantee`s
+///
+/// `LiteralGuarantee`s can be used to simplify filter expressions and skip 
data
+/// files (e.g. row groups in parquet files) by proving expressions can not
+/// possibly evaluate to `true`. For example, if we have a guarantee that `a`
+/// must be in (`1`) for a filter to evaluate to `true`, then we can skip any
+/// partition where we know that `a` never has the value of `1`.
+///
+/// **Important**: If a `LiteralGuarantee` is not satisfied, the relevant
+/// expression is *guaranteed* to evaluate to `false` or `null`. **However**,
+/// the opposite does not hold. Even if all `LiteralGurantee`s are satisfied,
+/// that does does not guarantee that the predicate will actually evaluate to
+/// `true`: it may still evaluate to `true`, `false` or `null`.
+///
+/// # Creating `LiteralGuarantee`s
+///
+/// Use [`LiteralGuarantee::analyze`] to extract literal guarantees from a
+/// filter predicate.
+///
+/// # Details
+/// A guarantee can be one of two forms:
+///
+/// 1. The column must be one a particular set of values for the predicate
+/// to be `true`. If the column takes on any other value, the predicate can not
+/// evaluate to `true`. For example,
+/// `(a = 1)`, `(a = 1 OR a = 2) or `a IN (1, 2, 3)`
+///
+/// 2. The column must NOT be one of a particular set of values for the 
predicate to
+/// be `true`. If the column takes on any of the values, the predicate can not
+/// evaluate to `true`. For example,
+/// `(a != 1)`, `(a != 1 AND a != 2)` or `a NOT IN (1, 2, 3)`
+#[derive(Debug, Clone, PartialEq)]
+pub struct LiteralGuarantee {
+    pub column: Column,
+    pub guarantee: Guarantee,
+    pub literals: HashSet<ScalarValue>,
+}
+
+/// What is guaranteed about the values for a [`LiteralGuarantee`]?
+#[derive(Debug, Clone, PartialEq)]
+pub enum Guarantee {
+    /// Guarantee that the expression is `true` if `column` is one the values. 
If
+    /// `column` is not one of the values, the expression can not be `true`.
+    In,
+    /// Guarantee that the expression is `true` if `column` is not ANY of the
+    /// values. If `column` is one of the values, the expression can not be
+    /// `true`.
+    NotIn,
+}
+
+impl LiteralGuarantee {
+    /// Create a new instance of the guarantee if the provided operator is
+    /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to
+    /// create these structures from an predicate (boolean expression).
+    fn try_new<'a>(
+        column_name: impl Into<String>,
+        op: &Operator,
+        literals: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Option<Self> {
+        let guarantee = match op {
+            Operator::Eq => Guarantee::In,
+            Operator::NotEq => Guarantee::NotIn,
+            _ => return None,
+        };
+
+        let literals: HashSet<_> = literals.into_iter().cloned().collect();
+
+        Some(Self {
+            column: Column::from_name(column_name),
+            guarantee,
+            literals,
+        })
+    }
+
+    /// Return a list of [`LiteralGuarantee`]s that must be satisfied for 
`expr`
+    /// to evaluate to `true`.
+    ///
+    /// If more than one `LiteralGuarantee` is returned, they must **all** hold
+    /// for the expression to possibly be `true`. If any is not satisfied, the
+    /// expression is guaranteed to be `null` or `false`.
+    ///
+    /// # Notes:
+    /// 1. `expr` must be a boolean expression.
+    /// 2. `expr` is not simplified prior to analysis.
+    pub fn analyze(expr: &Arc<dyn PhysicalExpr>) -> Vec<LiteralGuarantee> {
+        // split conjunction: <expr> AND <expr> AND ...
+        split_conjunction(expr)
+            .into_iter()
+            // for an `AND` conjunction to be true, all terms individually 
must be true
+            .fold(GuaranteeBuilder::new(), |builder, expr| {
+                if let Some(cel) = ColOpLit::try_new(expr) {
+                    return builder.aggregate_conjunct(cel);
+                } else {
+                    // split disjunction: <expr> OR <expr> OR ...
+                    let disjunctions = split_disjunction(expr);
+
+                    // We are trying to add a guarantee that a column must be
+                    // in/not in a particular set of values for the expression
+                    // to evaluate to true.
+                    //
+                    // A disjunction is true, if at least one the terms is be
+                    // true.
+                    //
+                    // Thus, we can infer a guarantee if all terms are of the
+                    // form `(col <op> literal) OR (col <op> literal) OR ...`.
+                    //
+                    // For example, we can infer that `a = 1 OR a = 2 OR a = 3`
+                    // is guaranteed to be true ONLY if a is in (`1`, `2` or 
`3`).
+                    //
+                    // However, for something like  `a = 1 OR a = 2 OR a < 0` 
we
+                    // **can't** guarantee that the predicate is only true if a
+                    // is in (`1`, `2`), as it could also be true if `a` were 
less
+                    // than zero.
+                    let terms = disjunctions
+                        .iter()
+                        .filter_map(|expr| ColOpLit::try_new(expr))
+                        .collect::<Vec<_>>();
+
+                    if terms.is_empty() {
+                        return builder;
+                    }
+
+                    // if not all terms are of the form (col <op> literal),
+                    // can't infer any guarantees
+                    if terms.len() != disjunctions.len() {
+                        return builder;
+                    }
+
+                    // if all terms are 'col <op> literal' with the same column
+                    // and operation we can infer any guarantees
+                    let first_term = &terms[0];
+                    if terms.iter().all(|term| {
+                        term.col.name() == first_term.col.name()
+                            && term.op == first_term.op
+                    }) {
+                        builder.aggregate_multi_conjunct(
+                            first_term.col,
+                            first_term.op,
+                            terms.iter().map(|term| term.lit.value()),
+                        )
+                    } else {
+                        // can't infer anything
+                        builder
+                    }
+                }
+            })
+            .build()
+    }
+}
+
+/// Combines conjuncts (aka terms `AND`ed together) into [`LiteralGuarantee`]s,
+/// preserving insert order
+#[derive(Debug, Default)]
+struct GuaranteeBuilder<'a> {
+    /// List of guarantees that have been created so far
+    /// if we have determined a subsequent conjunct invalidates a guarantee
+    /// e.g. `a = foo AND a = bar` then the relevant guarantee will be None
+    guarantees: Vec<Option<LiteralGuarantee>>,
+
+    /// Key is the (column name, operator type)
+    /// Value is the index into `guarantees`
+    map: HashMap<(&'a crate::expressions::Column, &'a Operator), usize>,
+}
+
+impl<'a> GuaranteeBuilder<'a> {
+    fn new() -> Self {
+        Default::default()
+    }
+
+    /// Aggregate a new single `AND col <op> literal` term to this builder
+    /// combining with existing guarantees if possible.
+    ///
+    /// # Examples
+    /// * `AND (a = 1)`: `a` is guaranteed to be 1
+    /// * `AND (a != 1)`: a is guaranteed to not be 1
+    fn aggregate_conjunct(self, col_op_lit: ColOpLit<'a>) -> Self {
+        self.aggregate_multi_conjunct(
+            col_op_lit.col,
+            col_op_lit.op,
+            [col_op_lit.lit.value()],
+        )
+    }
+
+    /// Aggregates a new single column, multi literal term to ths builder
+    /// combining with previously known guarantees if possible.
+    ///
+    /// # Examples
+    /// For the following examples, we can guarantee the expression is `true` 
if:
+    /// * `AND (a = 1 OR a = 2 OR a = 3)`: a is in (1, 2, or 3)
+    /// * `AND (a IN (1,2,3))`: a is in (1, 2, or 3)
+    /// * `AND (a != 1 OR a != 2 OR a != 3)`: a is not in (1, 2, or 3)
+    /// * `AND (a NOT IN (1,2,3))`: a is not in (1, 2, or 3)
+    fn aggregate_multi_conjunct(
+        mut self,
+        col: &'a crate::expressions::Column,
+        op: &'a Operator,
+        new_values: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Self {
+        let key = (col, op);
+        if let Some(index) = self.map.get(&key) {
+            // already have a guarantee for this column
+            let entry = &mut self.guarantees[*index];
+
+            let Some(existing) = entry else {
+                // determined the previous guarantee for this column has been
+                // invalidated, nothing to do
+                return self;
+            };
+
+            // Combine conjuncts if we have `a != foo AND a != bar`.
+            // `a = foo AND a = bar` doesn't make logical sense, s
+            match existing.guarantee {
+                // knew that the column could not be a set of values
+                // For example, if we previously had `a != 5` and now we see 
another `a != 6` we know
+                // if
+                Guarantee::NotIn => {
+                    // can extend if only single literal, otherwise invalidate
+                    let new_values: HashSet<_> = 
new_values.into_iter().collect();
+                    if new_values.len() == 1 {
+                        
existing.literals.extend(new_values.into_iter().cloned())
+                    } else {
+                        // this is like (a != foo AND (a != bar OR a != baz)).
+                        // We can't combine the (a!=bar OR a!=baz) part, but it
+                        // also doesn't invalidate a != foo guarantee.
+                    }
+                }
+                Guarantee::In => {
+                    // for an IN guarantee, it is ok if the value is the same
+                    // e.g. `a = foo AND a = foo` but not if the value is 
different
+                    // e.g. `a = foo AND a = bar`
+                    if new_values
+                        .into_iter()
+                        .all(|new_value| existing.literals.contains(new_value))
+                    {
+                        // all values are already in the set
+                    } else {
+                        // at least one was not, so invalidate the guarantee
+                        *entry = None;
+                    }
+                }
+            }
+        } else {
+            // This is a new guarantee
+            let new_values: HashSet<_> = new_values.into_iter().collect();
+
+            // new_values are combined with OR, so we can only create a
+            // multi-column guarantee for `=` (or a single value).
+            // (e.g. ignore `a != foo OR a != bar`)
+            if op == &Operator::Eq || new_values.len() == 1 {
+                if let Some(guarantee) =
+                    LiteralGuarantee::try_new(col.name(), op, new_values)
+                {
+                    // add it to the list of guarantees
+                    self.guarantees.push(Some(guarantee));
+                    self.map.insert(key, self.guarantees.len() - 1);
+                }
+            }
+        }
+
+        self
+    }
+
+    /// Return all guarantees that have been created so far
+    fn build(self) -> Vec<LiteralGuarantee> {
+        // filter out any guarantees that have been invalidated
+        self.guarantees.into_iter().flatten().collect()
+    }
+}
+
+/// Represents a single `col <op> literal` expression
+struct ColOpLit<'a> {
+    col: &'a crate::expressions::Column,
+    op: &'a Operator,
+    lit: &'a crate::expressions::Literal,
+}
+
+impl<'a> ColOpLit<'a> {
+    /// Returns Some(ColEqLit) if the expression is either:
+    /// 1. `col <op> literal`
+    /// 2. `literal <op> col`
+    ///
+    /// Returns None otherwise
+    fn try_new(expr: &'a Arc<dyn PhysicalExpr>) -> Option<Self> {
+        let binary_expr = expr
+            .as_any()
+            .downcast_ref::<crate::expressions::BinaryExpr>()?;
+
+        let (left, op, right) = (
+            binary_expr.left().as_any(),
+            binary_expr.op(),
+            binary_expr.right().as_any(),
+        );
+
+        // col <op> literal
+        if let (Some(col), Some(lit)) = (
+            left.downcast_ref::<crate::expressions::Column>(),
+            right.downcast_ref::<crate::expressions::Literal>(),
+        ) {
+            Some(Self { col, op, lit })
+        }
+        // literal <op> col
+        else if let (Some(lit), Some(col)) = (
+            left.downcast_ref::<crate::expressions::Literal>(),
+            right.downcast_ref::<crate::expressions::Column>(),
+        ) {
+            Some(Self { col, op, lit })
+        } else {
+            None
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::create_physical_expr;
+    use crate::execution_props::ExecutionProps;
+    use arrow_schema::{DataType, Field, Schema, SchemaRef};
+    use datafusion_common::ToDFSchema;
+    use datafusion_expr::expr_fn::*;
+    use datafusion_expr::{lit, Expr};
+    use std::sync::OnceLock;
+
+    #[test]
+    fn test_literal() {
+        // a single literal offers no guarantee
+        test_analyze(lit(true), vec![])
+    }
+
+    #[test]
+    fn test_single() {
+        // a = "foo"
+        test_analyze(col("a").eq(lit("foo")), vec![in_guarantee("a", 
["foo"])]);
+        // "foo" = a
+        test_analyze(lit("foo").eq(col("a")), vec![in_guarantee("a", 
["foo"])]);
+        // a != "foo"
+        test_analyze(
+            col("a").not_eq(lit("foo")),
+            vec![not_in_guarantee("a", ["foo"])],
+        );
+        // a != "foo"
+        test_analyze(
+            lit("foo").not_eq(col("a")),
+            vec![not_in_guarantee("a", ["foo"])],
+        );
+    }
+
+    #[test]
+    fn test_conjunction_single_column() {
+        // b = 1 AND b = 2. This is impossible. Ideally this expression could 
be simplified to false
+        test_analyze(col("b").eq(lit(1)).and(col("b").eq(lit(2))), vec![]);
+        // b = 1 AND b != 2 . In theory, this could be simplified to `b = 1`.
+        test_analyze(
+            col("b").eq(lit(1)).and(col("b").not_eq(lit(2))),
+            vec![
+                // can only be true of b is 1 and b is not 2 (even though it 
is redundant)
+                in_guarantee("b", [1]),
+                not_in_guarantee("b", [2]),
+            ],
+        );
+        // b != 1 AND b = 2. In theory, this could be simplified to `b = 2`.
+        test_analyze(
+            col("b").not_eq(lit(1)).and(col("b").eq(lit(2))),
+            vec![
+                // can only be true of b is not 1 and b is is 2 (even though 
it is redundant)
+                not_in_guarantee("b", [1]),
+                in_guarantee("b", [2]),
+            ],
+        );
+        // b != 1 AND b != 2
+        test_analyze(
+            col("b").not_eq(lit(1)).and(col("b").not_eq(lit(2))),
+            vec![not_in_guarantee("b", [1, 2])],
+        );
+        // b != 1 AND b != 2 and b != 3
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").not_eq(lit(2)))
+                .and(col("b").not_eq(lit(3))),
+            vec![not_in_guarantee("b", [1, 2, 3])],
+        );
+        // b != 1 AND b = 2 and b != 3. Can only be true if b is 2 and b is 
not in (1, 3)
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").eq(lit(2)))
+                .and(col("b").not_eq(lit(3))),
+            vec![not_in_guarantee("b", [1, 3]), in_guarantee("b", [2])],
+        );
+        // b != 1 AND b != 2 and b = 3 (in theory could determine b = 3)
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").not_eq(lit(2)))
+                .and(col("b").eq(lit(3))),
+            vec![not_in_guarantee("b", [1, 2]), in_guarantee("b", [3])],
+        );
+        // b != 1 AND b != 2 and b > 3 (to be true, b can't be either 1 or 2
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").not_eq(lit(2)))
+                .and(col("b").lt(lit(3))),
+            vec![not_in_guarantee("b", [1, 2])],
+        );
+    }
+
+    #[test]
+    fn test_conjunction_multi_column() {
+        // a = "foo" AND b = 1
+        test_analyze(
+            col("a").eq(lit("foo")).and(col("b").eq(lit(1))),
+            vec![
+                // should find both column guarantees
+                in_guarantee("a", ["foo"]),
+                in_guarantee("b", [1]),
+            ],
+        );
+        // a != "foo" AND b != 1
+        test_analyze(
+            col("a").not_eq(lit("foo")).and(col("b").not_eq(lit(1))),
+            // should find both column guarantees
+            vec![not_in_guarantee("a", ["foo"]), not_in_guarantee("b", [1])],
+        );
+        // a = "foo" AND a = "bar"
+        test_analyze(
+            col("a").eq(lit("foo")).and(col("a").eq(lit("bar"))),
+            // this predicate is impossible ( can't be both foo and bar),
+            vec![],
+        );
+        // a = "foo" AND b != "bar"
+        test_analyze(
+            col("a").eq(lit("foo")).and(col("a").not_eq(lit("bar"))),
+            vec![in_guarantee("a", ["foo"]), not_in_guarantee("a", ["bar"])],
+        );
+        // a != "foo" AND a != "bar"
+        test_analyze(
+            col("a").not_eq(lit("foo")).and(col("a").not_eq(lit("bar"))),
+            // know it isn't "foo" or "bar"
+            vec![not_in_guarantee("a", ["foo", "bar"])],
+        );
+        // a != "foo" AND a != "bar" and a != "baz"
+        test_analyze(
+            col("a")
+                .not_eq(lit("foo"))
+                .and(col("a").not_eq(lit("bar")))
+                .and(col("a").not_eq(lit("baz"))),
+            // know it isn't "foo" or "bar" or "baz"
+            vec![not_in_guarantee("a", ["foo", "bar", "baz"])],
+        );
+        // a = "foo" AND a = "foo"
+        let expr = col("a").eq(lit("foo"));
+        test_analyze(expr.clone().and(expr), vec![in_guarantee("a", ["foo"])]);
+        // b > 5 AND b = 10 (should get an b = 10 guarantee)
+        test_analyze(
+            col("b").gt(lit(5)).and(col("b").eq(lit(10))),
+            vec![in_guarantee("b", [10])],
+        );
+
+        // a != "foo" and (a != "bar" OR a != "baz")
+        test_analyze(
+            col("a")
+                .not_eq(lit("foo"))
+                
.and(col("a").not_eq(lit("bar")).or(col("a").not_eq(lit("baz")))),
+            // a is not foo (we can't represent other knowledge about a)
+            vec![not_in_guarantee("a", ["foo"])],
+        );
+    }
+
+    #[test]
+    fn test_conjunction_and_disjunction_single_column() {
+        // b != 1 AND (b > 2)
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").gt(lit(2)).or(col("b").eq(lit(3)))),
+            vec![
+                // for the expression to be true, b can not be one
+                not_in_guarantee("b", [1]),

Review Comment:
   The idea for using this for bloom filters is to try and prove that this 
expression is not true:
   
   ```
   b != 1 AND (b > 2)
   ```
   
   In order for the expression to be true, *both* of these have to hold
   1.  `b != 1
   2. b > 2
   
   Thus, if we can prove either does not hold, we can prove the expression is 
not true
   
   So therefore, to prove the expression can't possibly be true, we need to 
prove that b **ONLY** takes the value of `1`.
   
   But as you point out this is not what the documentation says at the moment
   
   ```
   /// 2. The column must NOT be one of a particular set of values for the 
predicate to
   /// be `true`. If the column takes on any of the values, the predicate can 
not
   /// evaluate to `true`. For example,
   /// `(a != 1)`, `(a != 1 AND a != 2)` or `a NOT IN (1, 2, 3)`
   ```
   
   It should probably say 
   ```
   /// 2. The column must NOT be one of a particular set of values for the 
predicate to
   /// be `true`. If the column can ONLY take one of these values, the 
predicate can not
   /// evaluate to `true`. 
   ```
   
   
   🤔  I need to think about this more deeply



##########
datafusion/physical-expr/src/utils/guarantee.rs:
##########
@@ -0,0 +1,699 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`LiteralGuarantee`] predicate analysis to determine if a column is a
+//! constant.
+
+use crate::utils::split_disjunction;
+use crate::{split_conjunction, PhysicalExpr};
+use datafusion_common::{Column, ScalarValue};
+use datafusion_expr::Operator;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+/// Represents a guarantee that must be true for a boolean expression to
+/// evaluate to `true`.
+///
+/// The guarantee takes the form of a column and a set of literal (constant)
+/// [`ScalarValue`]s. For the expression to evaluate to `true`, the column 
*must
+/// satisfy* the guarantee(s).
+///
+/// To satisfy the guarantee, depending on [`Guarantee`],  the values in the
+/// column must either:
+///
+/// 1. be ONLY one of that set
+/// 2. NOT be ANY of that set
+///
+/// # Uses `LiteralGuarantee`s
+///
+/// `LiteralGuarantee`s can be used to simplify filter expressions and skip 
data
+/// files (e.g. row groups in parquet files) by proving expressions can not
+/// possibly evaluate to `true`. For example, if we have a guarantee that `a`
+/// must be in (`1`) for a filter to evaluate to `true`, then we can skip any
+/// partition where we know that `a` never has the value of `1`.
+///
+/// **Important**: If a `LiteralGuarantee` is not satisfied, the relevant
+/// expression is *guaranteed* to evaluate to `false` or `null`. **However**,
+/// the opposite does not hold. Even if all `LiteralGurantee`s are satisfied,
+/// that does does not guarantee that the predicate will actually evaluate to
+/// `true`: it may still evaluate to `true`, `false` or `null`.
+///
+/// # Creating `LiteralGuarantee`s
+///
+/// Use [`LiteralGuarantee::analyze`] to extract literal guarantees from a
+/// filter predicate.
+///
+/// # Details
+/// A guarantee can be one of two forms:
+///
+/// 1. The column must be one a particular set of values for the predicate
+/// to be `true`. If the column takes on any other value, the predicate can not
+/// evaluate to `true`. For example,
+/// `(a = 1)`, `(a = 1 OR a = 2) or `a IN (1, 2, 3)`
+///
+/// 2. The column must NOT be one of a particular set of values for the 
predicate to
+/// be `true`. If the column takes on any of the values, the predicate can not
+/// evaluate to `true`. For example,
+/// `(a != 1)`, `(a != 1 AND a != 2)` or `a NOT IN (1, 2, 3)`
+#[derive(Debug, Clone, PartialEq)]
+pub struct LiteralGuarantee {
+    pub column: Column,
+    pub guarantee: Guarantee,
+    pub literals: HashSet<ScalarValue>,
+}
+
+/// What is guaranteed about the values for a [`LiteralGuarantee`]?
+#[derive(Debug, Clone, PartialEq)]
+pub enum Guarantee {
+    /// Guarantee that the expression is `true` if `column` is one the values. 
If
+    /// `column` is not one of the values, the expression can not be `true`.
+    In,
+    /// Guarantee that the expression is `true` if `column` is not ANY of the
+    /// values. If `column` is one of the values, the expression can not be
+    /// `true`.
+    NotIn,
+}
+
+impl LiteralGuarantee {
+    /// Create a new instance of the guarantee if the provided operator is
+    /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to
+    /// create these structures from an predicate (boolean expression).
+    fn try_new<'a>(
+        column_name: impl Into<String>,
+        op: &Operator,
+        literals: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Option<Self> {
+        let guarantee = match op {
+            Operator::Eq => Guarantee::In,
+            Operator::NotEq => Guarantee::NotIn,
+            _ => return None,
+        };
+
+        let literals: HashSet<_> = literals.into_iter().cloned().collect();
+
+        Some(Self {
+            column: Column::from_name(column_name),
+            guarantee,
+            literals,
+        })
+    }
+
+    /// Return a list of [`LiteralGuarantee`]s that must be satisfied for 
`expr`
+    /// to evaluate to `true`.
+    ///
+    /// If more than one `LiteralGuarantee` is returned, they must **all** hold
+    /// for the expression to possibly be `true`. If any is not satisfied, the
+    /// expression is guaranteed to be `null` or `false`.
+    ///
+    /// # Notes:
+    /// 1. `expr` must be a boolean expression.
+    /// 2. `expr` is not simplified prior to analysis.
+    pub fn analyze(expr: &Arc<dyn PhysicalExpr>) -> Vec<LiteralGuarantee> {
+        // split conjunction: <expr> AND <expr> AND ...
+        split_conjunction(expr)
+            .into_iter()
+            // for an `AND` conjunction to be true, all terms individually 
must be true
+            .fold(GuaranteeBuilder::new(), |builder, expr| {
+                if let Some(cel) = ColOpLit::try_new(expr) {
+                    return builder.aggregate_conjunct(cel);
+                } else {
+                    // split disjunction: <expr> OR <expr> OR ...
+                    let disjunctions = split_disjunction(expr);
+
+                    // We are trying to add a guarantee that a column must be
+                    // in/not in a particular set of values for the expression
+                    // to evaluate to true.
+                    //
+                    // A disjunction is true, if at least one the terms is be
+                    // true.
+                    //
+                    // Thus, we can infer a guarantee if all terms are of the
+                    // form `(col <op> literal) OR (col <op> literal) OR ...`.
+                    //
+                    // For example, we can infer that `a = 1 OR a = 2 OR a = 3`
+                    // is guaranteed to be true ONLY if a is in (`1`, `2` or 
`3`).
+                    //
+                    // However, for something like  `a = 1 OR a = 2 OR a < 0` 
we
+                    // **can't** guarantee that the predicate is only true if a
+                    // is in (`1`, `2`), as it could also be true if `a` were 
less
+                    // than zero.
+                    let terms = disjunctions
+                        .iter()
+                        .filter_map(|expr| ColOpLit::try_new(expr))
+                        .collect::<Vec<_>>();
+
+                    if terms.is_empty() {
+                        return builder;
+                    }
+
+                    // if not all terms are of the form (col <op> literal),
+                    // can't infer any guarantees
+                    if terms.len() != disjunctions.len() {
+                        return builder;
+                    }
+
+                    // if all terms are 'col <op> literal' with the same column
+                    // and operation we can infer any guarantees
+                    let first_term = &terms[0];
+                    if terms.iter().all(|term| {
+                        term.col.name() == first_term.col.name()
+                            && term.op == first_term.op
+                    }) {
+                        builder.aggregate_multi_conjunct(
+                            first_term.col,
+                            first_term.op,
+                            terms.iter().map(|term| term.lit.value()),
+                        )
+                    } else {
+                        // can't infer anything
+                        builder
+                    }
+                }
+            })
+            .build()
+    }
+}
+
+/// Combines conjuncts (aka terms `AND`ed together) into [`LiteralGuarantee`]s,
+/// preserving insert order
+#[derive(Debug, Default)]
+struct GuaranteeBuilder<'a> {
+    /// List of guarantees that have been created so far
+    /// if we have determined a subsequent conjunct invalidates a guarantee
+    /// e.g. `a = foo AND a = bar` then the relevant guarantee will be None
+    guarantees: Vec<Option<LiteralGuarantee>>,
+
+    /// Key is the (column name, operator type)
+    /// Value is the index into `guarantees`
+    map: HashMap<(&'a crate::expressions::Column, &'a Operator), usize>,
+}
+
+impl<'a> GuaranteeBuilder<'a> {
+    fn new() -> Self {
+        Default::default()
+    }
+
+    /// Aggregate a new single `AND col <op> literal` term to this builder
+    /// combining with existing guarantees if possible.
+    ///
+    /// # Examples
+    /// * `AND (a = 1)`: `a` is guaranteed to be 1
+    /// * `AND (a != 1)`: a is guaranteed to not be 1
+    fn aggregate_conjunct(self, col_op_lit: ColOpLit<'a>) -> Self {
+        self.aggregate_multi_conjunct(
+            col_op_lit.col,
+            col_op_lit.op,
+            [col_op_lit.lit.value()],
+        )
+    }
+
+    /// Aggregates a new single column, multi literal term to ths builder
+    /// combining with previously known guarantees if possible.
+    ///
+    /// # Examples
+    /// For the following examples, we can guarantee the expression is `true` 
if:
+    /// * `AND (a = 1 OR a = 2 OR a = 3)`: a is in (1, 2, or 3)
+    /// * `AND (a IN (1,2,3))`: a is in (1, 2, or 3)
+    /// * `AND (a != 1 OR a != 2 OR a != 3)`: a is not in (1, 2, or 3)
+    /// * `AND (a NOT IN (1,2,3))`: a is not in (1, 2, or 3)
+    fn aggregate_multi_conjunct(
+        mut self,
+        col: &'a crate::expressions::Column,
+        op: &'a Operator,
+        new_values: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Self {
+        let key = (col, op);
+        if let Some(index) = self.map.get(&key) {
+            // already have a guarantee for this column
+            let entry = &mut self.guarantees[*index];
+
+            let Some(existing) = entry else {
+                // determined the previous guarantee for this column has been
+                // invalidated, nothing to do
+                return self;
+            };
+
+            // Combine conjuncts if we have `a != foo AND a != bar`.
+            // `a = foo AND a = bar` doesn't make logical sense, s
+            match existing.guarantee {
+                // knew that the column could not be a set of values
+                // For example, if we previously had `a != 5` and now we see 
another `a != 6` we know
+                // if
+                Guarantee::NotIn => {
+                    // can extend if only single literal, otherwise invalidate
+                    let new_values: HashSet<_> = 
new_values.into_iter().collect();
+                    if new_values.len() == 1 {
+                        
existing.literals.extend(new_values.into_iter().cloned())
+                    } else {
+                        // this is like (a != foo AND (a != bar OR a != baz)).
+                        // We can't combine the (a!=bar OR a!=baz) part, but it
+                        // also doesn't invalidate a != foo guarantee.
+                    }
+                }
+                Guarantee::In => {
+                    // for an IN guarantee, it is ok if the value is the same
+                    // e.g. `a = foo AND a = foo` but not if the value is 
different
+                    // e.g. `a = foo AND a = bar`
+                    if new_values
+                        .into_iter()
+                        .all(|new_value| existing.literals.contains(new_value))
+                    {
+                        // all values are already in the set
+                    } else {
+                        // at least one was not, so invalidate the guarantee
+                        *entry = None;
+                    }
+                }
+            }
+        } else {
+            // This is a new guarantee
+            let new_values: HashSet<_> = new_values.into_iter().collect();
+
+            // new_values are combined with OR, so we can only create a
+            // multi-column guarantee for `=` (or a single value).
+            // (e.g. ignore `a != foo OR a != bar`)
+            if op == &Operator::Eq || new_values.len() == 1 {
+                if let Some(guarantee) =
+                    LiteralGuarantee::try_new(col.name(), op, new_values)
+                {
+                    // add it to the list of guarantees
+                    self.guarantees.push(Some(guarantee));
+                    self.map.insert(key, self.guarantees.len() - 1);
+                }
+            }
+        }
+
+        self
+    }
+
+    /// Return all guarantees that have been created so far
+    fn build(self) -> Vec<LiteralGuarantee> {
+        // filter out any guarantees that have been invalidated
+        self.guarantees.into_iter().flatten().collect()
+    }
+}
+
+/// Represents a single `col <op> literal` expression
+struct ColOpLit<'a> {
+    col: &'a crate::expressions::Column,
+    op: &'a Operator,
+    lit: &'a crate::expressions::Literal,
+}
+
+impl<'a> ColOpLit<'a> {
+    /// Returns Some(ColEqLit) if the expression is either:
+    /// 1. `col <op> literal`
+    /// 2. `literal <op> col`
+    ///
+    /// Returns None otherwise
+    fn try_new(expr: &'a Arc<dyn PhysicalExpr>) -> Option<Self> {
+        let binary_expr = expr
+            .as_any()
+            .downcast_ref::<crate::expressions::BinaryExpr>()?;
+
+        let (left, op, right) = (
+            binary_expr.left().as_any(),
+            binary_expr.op(),
+            binary_expr.right().as_any(),
+        );
+
+        // col <op> literal
+        if let (Some(col), Some(lit)) = (
+            left.downcast_ref::<crate::expressions::Column>(),
+            right.downcast_ref::<crate::expressions::Literal>(),
+        ) {
+            Some(Self { col, op, lit })
+        }
+        // literal <op> col
+        else if let (Some(lit), Some(col)) = (
+            left.downcast_ref::<crate::expressions::Literal>(),
+            right.downcast_ref::<crate::expressions::Column>(),
+        ) {
+            Some(Self { col, op, lit })
+        } else {
+            None
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::create_physical_expr;
+    use crate::execution_props::ExecutionProps;
+    use arrow_schema::{DataType, Field, Schema, SchemaRef};
+    use datafusion_common::ToDFSchema;
+    use datafusion_expr::expr_fn::*;
+    use datafusion_expr::{lit, Expr};
+    use std::sync::OnceLock;
+
+    #[test]
+    fn test_literal() {
+        // a single literal offers no guarantee
+        test_analyze(lit(true), vec![])
+    }
+
+    #[test]
+    fn test_single() {
+        // a = "foo"
+        test_analyze(col("a").eq(lit("foo")), vec![in_guarantee("a", 
["foo"])]);
+        // "foo" = a
+        test_analyze(lit("foo").eq(col("a")), vec![in_guarantee("a", 
["foo"])]);
+        // a != "foo"
+        test_analyze(
+            col("a").not_eq(lit("foo")),
+            vec![not_in_guarantee("a", ["foo"])],
+        );
+        // a != "foo"
+        test_analyze(
+            lit("foo").not_eq(col("a")),
+            vec![not_in_guarantee("a", ["foo"])],
+        );
+    }
+
+    #[test]
+    fn test_conjunction_single_column() {
+        // b = 1 AND b = 2. This is impossible. Ideally this expression could 
be simplified to false
+        test_analyze(col("b").eq(lit(1)).and(col("b").eq(lit(2))), vec![]);
+        // b = 1 AND b != 2 . In theory, this could be simplified to `b = 1`.
+        test_analyze(
+            col("b").eq(lit(1)).and(col("b").not_eq(lit(2))),
+            vec![
+                // can only be true of b is 1 and b is not 2 (even though it 
is redundant)
+                in_guarantee("b", [1]),
+                not_in_guarantee("b", [2]),
+            ],
+        );
+        // b != 1 AND b = 2. In theory, this could be simplified to `b = 2`.
+        test_analyze(
+            col("b").not_eq(lit(1)).and(col("b").eq(lit(2))),
+            vec![
+                // can only be true of b is not 1 and b is is 2 (even though 
it is redundant)
+                not_in_guarantee("b", [1]),
+                in_guarantee("b", [2]),
+            ],
+        );
+        // b != 1 AND b != 2
+        test_analyze(
+            col("b").not_eq(lit(1)).and(col("b").not_eq(lit(2))),
+            vec![not_in_guarantee("b", [1, 2])],
+        );
+        // b != 1 AND b != 2 and b != 3
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").not_eq(lit(2)))
+                .and(col("b").not_eq(lit(3))),
+            vec![not_in_guarantee("b", [1, 2, 3])],
+        );
+        // b != 1 AND b = 2 and b != 3. Can only be true if b is 2 and b is 
not in (1, 3)
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").eq(lit(2)))
+                .and(col("b").not_eq(lit(3))),
+            vec![not_in_guarantee("b", [1, 3]), in_guarantee("b", [2])],
+        );
+        // b != 1 AND b != 2 and b = 3 (in theory could determine b = 3)
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").not_eq(lit(2)))
+                .and(col("b").eq(lit(3))),
+            vec![not_in_guarantee("b", [1, 2]), in_guarantee("b", [3])],
+        );
+        // b != 1 AND b != 2 and b > 3 (to be true, b can't be either 1 or 2
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").not_eq(lit(2)))
+                .and(col("b").lt(lit(3))),
+            vec![not_in_guarantee("b", [1, 2])],
+        );
+    }
+
+    #[test]
+    fn test_conjunction_multi_column() {
+        // a = "foo" AND b = 1
+        test_analyze(
+            col("a").eq(lit("foo")).and(col("b").eq(lit(1))),
+            vec![
+                // should find both column guarantees
+                in_guarantee("a", ["foo"]),
+                in_guarantee("b", [1]),
+            ],
+        );
+        // a != "foo" AND b != 1
+        test_analyze(
+            col("a").not_eq(lit("foo")).and(col("b").not_eq(lit(1))),
+            // should find both column guarantees
+            vec![not_in_guarantee("a", ["foo"]), not_in_guarantee("b", [1])],
+        );
+        // a = "foo" AND a = "bar"
+        test_analyze(
+            col("a").eq(lit("foo")).and(col("a").eq(lit("bar"))),
+            // this predicate is impossible ( can't be both foo and bar),
+            vec![],
+        );
+        // a = "foo" AND b != "bar"
+        test_analyze(
+            col("a").eq(lit("foo")).and(col("a").not_eq(lit("bar"))),
+            vec![in_guarantee("a", ["foo"]), not_in_guarantee("a", ["bar"])],
+        );
+        // a != "foo" AND a != "bar"
+        test_analyze(
+            col("a").not_eq(lit("foo")).and(col("a").not_eq(lit("bar"))),
+            // know it isn't "foo" or "bar"
+            vec![not_in_guarantee("a", ["foo", "bar"])],
+        );
+        // a != "foo" AND a != "bar" and a != "baz"
+        test_analyze(
+            col("a")
+                .not_eq(lit("foo"))
+                .and(col("a").not_eq(lit("bar")))
+                .and(col("a").not_eq(lit("baz"))),
+            // know it isn't "foo" or "bar" or "baz"
+            vec![not_in_guarantee("a", ["foo", "bar", "baz"])],
+        );
+        // a = "foo" AND a = "foo"
+        let expr = col("a").eq(lit("foo"));
+        test_analyze(expr.clone().and(expr), vec![in_guarantee("a", ["foo"])]);
+        // b > 5 AND b = 10 (should get an b = 10 guarantee)
+        test_analyze(
+            col("b").gt(lit(5)).and(col("b").eq(lit(10))),
+            vec![in_guarantee("b", [10])],
+        );
+
+        // a != "foo" and (a != "bar" OR a != "baz")
+        test_analyze(
+            col("a")
+                .not_eq(lit("foo"))
+                
.and(col("a").not_eq(lit("bar")).or(col("a").not_eq(lit("baz")))),
+            // a is not foo (we can't represent other knowledge about a)
+            vec![not_in_guarantee("a", ["foo"])],
+        );
+    }
+
+    #[test]
+    fn test_conjunction_and_disjunction_single_column() {
+        // b != 1 AND (b > 2)
+        test_analyze(
+            col("b")
+                .not_eq(lit(1))
+                .and(col("b").gt(lit(2)).or(col("b").eq(lit(3)))),

Review Comment:
   Yes, good catch -- I have updated it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to