This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b5e94a688e Add `LiteralGuarantee` on columns to extract conditions
required for `PhysicalExpr` expressions to evaluate to true (#8437)
b5e94a688e is described below
commit b5e94a688e3a66325cc6ed9b2e35b44cf6cd9ba8
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Dec 18 14:08:33 2023 -0500
Add `LiteralGuarantee` on columns to extract conditions required for
`PhysicalExpr` expressions to evaluate to true (#8437)
* Introduce LiteralGurantee to find col=const
* Improve comments
* Improve documentation
* Add more documentation and tests
* refine documentation and tests
* Apply suggestions from code review
Co-authored-by: Nga Tran <[email protected]>
* Fix half comment
* swap operators before analysis
* More tests
* cmt
* Apply suggestions from code review
Co-authored-by: Ruihang Xia <[email protected]>
* refine comments more
---------
Co-authored-by: Nga Tran <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
---
datafusion/physical-expr/src/utils/guarantee.rs | 709 +++++++++++++++++++++
.../physical-expr/src/{utils.rs => utils/mod.rs} | 33 +-
2 files changed, 729 insertions(+), 13 deletions(-)
diff --git a/datafusion/physical-expr/src/utils/guarantee.rs
b/datafusion/physical-expr/src/utils/guarantee.rs
new file mode 100644
index 0000000000..59ec255754
--- /dev/null
+++ b/datafusion/physical-expr/src/utils/guarantee.rs
@@ -0,0 +1,709 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`LiteralGuarantee`] predicate analysis to determine if a column is a
+//! constant.
+
+use crate::utils::split_disjunction;
+use crate::{split_conjunction, PhysicalExpr};
+use datafusion_common::{Column, ScalarValue};
+use datafusion_expr::Operator;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+/// Represents a guarantee that must be true for a boolean expression to
+/// evaluate to `true`.
+///
+/// The guarantee takes the form of a column and a set of literal (constant)
+/// [`ScalarValue`]s. For the expression to evaluate to `true`, the column
*must
+/// satisfy* the guarantee(s).
+///
+/// To satisfy the guarantee, depending on [`Guarantee`], the values in the
+/// column must either:
+///
+/// 1. be ONLY one of that set
+/// 2. NOT be ANY of that set
+///
+/// # Uses `LiteralGuarantee`s
+///
+/// `LiteralGuarantee`s can be used to simplify filter expressions and skip
data
+/// files (e.g. row groups in parquet files) by proving expressions can not
+/// possibly evaluate to `true`. For example, if we have a guarantee that `a`
+/// must be in (`1`) for a filter to evaluate to `true`, then we can skip any
+/// partition where we know that `a` never has the value of `1`.
+///
+/// **Important**: If a `LiteralGuarantee` is not satisfied, the relevant
+/// expression is *guaranteed* to evaluate to `false` or `null`. **However**,
+/// the opposite does not hold. Even if all `LiteralGuarantee`s are satisfied,
+/// that does **not** guarantee that the predicate will actually evaluate to
+/// `true`: it may still evaluate to `true`, `false` or `null`.
+///
+/// # Creating `LiteralGuarantee`s
+///
+/// Use [`LiteralGuarantee::analyze`] to extract literal guarantees from a
+/// filter predicate.
+///
+/// # Details
+/// A guarantee can be one of two forms:
+///
+/// 1. The column must be one the values for the predicate to be `true`. If the
+/// column takes on any other value, the predicate can not evaluate to `true`.
+/// For example,
+/// `(a = 1)`, `(a = 1 OR a = 2) or `a IN (1, 2, 3)`
+///
+/// 2. The column must NOT be one of the values for the predicate to be `true`.
+/// If the column can ONLY take one of these values, the predicate can not
+/// evaluate to `true`. For example,
+/// `(a != 1)`, `(a != 1 AND a != 2)` or `a NOT IN (1, 2, 3)`
+#[derive(Debug, Clone, PartialEq)]
+pub struct LiteralGuarantee {
+ pub column: Column,
+ pub guarantee: Guarantee,
+ pub literals: HashSet<ScalarValue>,
+}
+
+/// What is guaranteed about the values for a [`LiteralGuarantee`]?
+#[derive(Debug, Clone, PartialEq)]
+pub enum Guarantee {
+ /// Guarantee that the expression is `true` if `column` is one of the
values. If
+ /// `column` is not one of the values, the expression can not be `true`.
+ In,
+ /// Guarantee that the expression is `true` if `column` is not ANY of the
+ /// values. If `column` only takes one of these values, the expression can
+ /// not be `true`.
+ NotIn,
+}
+
+impl LiteralGuarantee {
+ /// Create a new instance of the guarantee if the provided operator is
+ /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to
+ /// create these structures from an predicate (boolean expression).
+ fn try_new<'a>(
+ column_name: impl Into<String>,
+ op: Operator,
+ literals: impl IntoIterator<Item = &'a ScalarValue>,
+ ) -> Option<Self> {
+ let guarantee = match op {
+ Operator::Eq => Guarantee::In,
+ Operator::NotEq => Guarantee::NotIn,
+ _ => return None,
+ };
+
+ let literals: HashSet<_> = literals.into_iter().cloned().collect();
+
+ Some(Self {
+ column: Column::from_name(column_name),
+ guarantee,
+ literals,
+ })
+ }
+
+ /// Return a list of [`LiteralGuarantee`]s that must be satisfied for
`expr`
+ /// to evaluate to `true`.
+ ///
+ /// If more than one `LiteralGuarantee` is returned, they must **all** hold
+ /// for the expression to possibly be `true`. If any is not satisfied, the
+ /// expression is guaranteed to be `null` or `false`.
+ ///
+ /// # Notes:
+ /// 1. `expr` must be a boolean expression.
+ /// 2. `expr` is not simplified prior to analysis.
+ pub fn analyze(expr: &Arc<dyn PhysicalExpr>) -> Vec<LiteralGuarantee> {
+ // split conjunction: <expr> AND <expr> AND ...
+ split_conjunction(expr)
+ .into_iter()
+ // for an `AND` conjunction to be true, all terms individually
must be true
+ .fold(GuaranteeBuilder::new(), |builder, expr| {
+ if let Some(cel) = ColOpLit::try_new(expr) {
+ return builder.aggregate_conjunct(cel);
+ } else {
+ // split disjunction: <expr> OR <expr> OR ...
+ let disjunctions = split_disjunction(expr);
+
+ // We are trying to add a guarantee that a column must be
+ // in/not in a particular set of values for the expression
+ // to evaluate to true.
+ //
+ // A disjunction is true, if at least one of the terms is
be
+ // true.
+ //
+ // Thus, we can infer a guarantee if all terms are of the
+ // form `(col <op> literal) OR (col <op> literal) OR ...`.
+ //
+ // For example, we can infer that `a = 1 OR a = 2 OR a = 3`
+ // is guaranteed to be true ONLY if a is in (`1`, `2` or
`3`).
+ //
+ // However, for something like `a = 1 OR a = 2 OR a < 0`
we
+ // **can't** guarantee that the predicate is only true if a
+ // is in (`1`, `2`), as it could also be true if `a` were
less
+ // than zero.
+ let terms = disjunctions
+ .iter()
+ .filter_map(|expr| ColOpLit::try_new(expr))
+ .collect::<Vec<_>>();
+
+ if terms.is_empty() {
+ return builder;
+ }
+
+ // if not all terms are of the form (col <op> literal),
+ // can't infer any guarantees
+ if terms.len() != disjunctions.len() {
+ return builder;
+ }
+
+ // if all terms are 'col <op> literal' with the same column
+ // and operation we can infer any guarantees
+ let first_term = &terms[0];
+ if terms.iter().all(|term| {
+ term.col.name() == first_term.col.name()
+ && term.op == first_term.op
+ }) {
+ builder.aggregate_multi_conjunct(
+ first_term.col,
+ first_term.op,
+ terms.iter().map(|term| term.lit.value()),
+ )
+ } else {
+ // can't infer anything
+ builder
+ }
+ }
+ })
+ .build()
+ }
+}
+
+/// Combines conjuncts (aka terms `AND`ed together) into [`LiteralGuarantee`]s,
+/// preserving insert order
+#[derive(Debug, Default)]
+struct GuaranteeBuilder<'a> {
+ /// List of guarantees that have been created so far
+ /// if we have determined a subsequent conjunct invalidates a guarantee
+ /// e.g. `a = foo AND a = bar` then the relevant guarantee will be None
+ guarantees: Vec<Option<LiteralGuarantee>>,
+
+ /// Key is the (column name, operator type)
+ /// Value is the index into `guarantees`
+ map: HashMap<(&'a crate::expressions::Column, Operator), usize>,
+}
+
+impl<'a> GuaranteeBuilder<'a> {
+ fn new() -> Self {
+ Default::default()
+ }
+
+ /// Aggregate a new single `AND col <op> literal` term to this builder
+ /// combining with existing guarantees if possible.
+ ///
+ /// # Examples
+ /// * `AND (a = 1)`: `a` is guaranteed to be 1
+ /// * `AND (a != 1)`: a is guaranteed to not be 1
+ fn aggregate_conjunct(self, col_op_lit: ColOpLit<'a>) -> Self {
+ self.aggregate_multi_conjunct(
+ col_op_lit.col,
+ col_op_lit.op,
+ [col_op_lit.lit.value()],
+ )
+ }
+
+ /// Aggregates a new single column, multi literal term to ths builder
+ /// combining with previously known guarantees if possible.
+ ///
+ /// # Examples
+ /// For the following examples, we can guarantee the expression is `true`
if:
+ /// * `AND (a = 1 OR a = 2 OR a = 3)`: a is in (1, 2, or 3)
+ /// * `AND (a IN (1,2,3))`: a is in (1, 2, or 3)
+ /// * `AND (a != 1 OR a != 2 OR a != 3)`: a is not in (1, 2, or 3)
+ /// * `AND (a NOT IN (1,2,3))`: a is not in (1, 2, or 3)
+ fn aggregate_multi_conjunct(
+ mut self,
+ col: &'a crate::expressions::Column,
+ op: Operator,
+ new_values: impl IntoIterator<Item = &'a ScalarValue>,
+ ) -> Self {
+ let key = (col, op);
+ if let Some(index) = self.map.get(&key) {
+ // already have a guarantee for this column
+ let entry = &mut self.guarantees[*index];
+
+ let Some(existing) = entry else {
+ // determined the previous guarantee for this column has been
+ // invalidated, nothing to do
+ return self;
+ };
+
+ // Combine conjuncts if we have `a != foo AND a != bar`. `a = foo
+ // AND a = bar` doesn't make logical sense so we don't optimize
this
+ // case
+ match existing.guarantee {
+ // knew that the column could not be a set of values
+ //
+ // For example, if we previously had `a != 5` and now we see
+ // another `AND a != 6` we know that a must not be either 5 or
6
+ // for the expression to be true
+ Guarantee::NotIn => {
+ // can extend if only single literal, otherwise invalidate
+ let new_values: HashSet<_> =
new_values.into_iter().collect();
+ if new_values.len() == 1 {
+
existing.literals.extend(new_values.into_iter().cloned())
+ } else {
+ // this is like (a != foo AND (a != bar OR a != baz)).
+ // We can't combine the (a != bar OR a != baz) part,
but
+ // it also doesn't invalidate our knowledge that a !=
+ // foo is required for the expression to be true
+ }
+ }
+ Guarantee::In => {
+ // for an IN guarantee, it is ok if the value is the same
+ // e.g. `a = foo AND a = foo` but not if the value is
different
+ // e.g. `a = foo AND a = bar`
+ if new_values
+ .into_iter()
+ .all(|new_value| existing.literals.contains(new_value))
+ {
+ // all values are already in the set
+ } else {
+ // at least one was not, so invalidate the guarantee
+ *entry = None;
+ }
+ }
+ }
+ } else {
+ // This is a new guarantee
+ let new_values: HashSet<_> = new_values.into_iter().collect();
+
+ // new_values are combined with OR, so we can only create a
+ // multi-column guarantee for `=` (or a single value).
+ // (e.g. ignore `a != foo OR a != bar`)
+ if op == Operator::Eq || new_values.len() == 1 {
+ if let Some(guarantee) =
+ LiteralGuarantee::try_new(col.name(), op, new_values)
+ {
+ // add it to the list of guarantees
+ self.guarantees.push(Some(guarantee));
+ self.map.insert(key, self.guarantees.len() - 1);
+ }
+ }
+ }
+
+ self
+ }
+
+ /// Return all guarantees that have been created so far
+ fn build(self) -> Vec<LiteralGuarantee> {
+ // filter out any guarantees that have been invalidated
+ self.guarantees.into_iter().flatten().collect()
+ }
+}
+
+/// Represents a single `col <op> literal` expression
+struct ColOpLit<'a> {
+ col: &'a crate::expressions::Column,
+ op: Operator,
+ lit: &'a crate::expressions::Literal,
+}
+
+impl<'a> ColOpLit<'a> {
+ /// Returns Some(ColEqLit) if the expression is either:
+ /// 1. `col <op> literal`
+ /// 2. `literal <op> col`
+ ///
+ /// Returns None otherwise
+ fn try_new(expr: &'a Arc<dyn PhysicalExpr>) -> Option<Self> {
+ let binary_expr = expr
+ .as_any()
+ .downcast_ref::<crate::expressions::BinaryExpr>()?;
+
+ let (left, op, right) = (
+ binary_expr.left().as_any(),
+ binary_expr.op(),
+ binary_expr.right().as_any(),
+ );
+
+ // col <op> literal
+ if let (Some(col), Some(lit)) = (
+ left.downcast_ref::<crate::expressions::Column>(),
+ right.downcast_ref::<crate::expressions::Literal>(),
+ ) {
+ Some(Self { col, op: *op, lit })
+ }
+ // literal <op> col
+ else if let (Some(lit), Some(col)) = (
+ left.downcast_ref::<crate::expressions::Literal>(),
+ right.downcast_ref::<crate::expressions::Column>(),
+ ) {
+ // Used swapped operator operator, if possible
+ op.swap().map(|op| Self { col, op, lit })
+ } else {
+ None
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::create_physical_expr;
+ use crate::execution_props::ExecutionProps;
+ use arrow_schema::{DataType, Field, Schema, SchemaRef};
+ use datafusion_common::ToDFSchema;
+ use datafusion_expr::expr_fn::*;
+ use datafusion_expr::{lit, Expr};
+ use std::sync::OnceLock;
+
+ #[test]
+ fn test_literal() {
+ // a single literal offers no guarantee
+ test_analyze(lit(true), vec![])
+ }
+
+ #[test]
+ fn test_single() {
+ // a = "foo"
+ test_analyze(col("a").eq(lit("foo")), vec![in_guarantee("a",
["foo"])]);
+ // "foo" = a
+ test_analyze(lit("foo").eq(col("a")), vec![in_guarantee("a",
["foo"])]);
+ // a != "foo"
+ test_analyze(
+ col("a").not_eq(lit("foo")),
+ vec![not_in_guarantee("a", ["foo"])],
+ );
+ // "foo" != a
+ test_analyze(
+ lit("foo").not_eq(col("a")),
+ vec![not_in_guarantee("a", ["foo"])],
+ );
+ }
+
+ #[test]
+ fn test_conjunction_single_column() {
+ // b = 1 AND b = 2. This is impossible. Ideally this expression could
be simplified to false
+ test_analyze(col("b").eq(lit(1)).and(col("b").eq(lit(2))), vec![]);
+ // b = 1 AND b != 2 . In theory, this could be simplified to `b = 1`.
+ test_analyze(
+ col("b").eq(lit(1)).and(col("b").not_eq(lit(2))),
+ vec![
+ // can only be true of b is 1 and b is not 2 (even though it
is redundant)
+ in_guarantee("b", [1]),
+ not_in_guarantee("b", [2]),
+ ],
+ );
+ // b != 1 AND b = 2. In theory, this could be simplified to `b = 2`.
+ test_analyze(
+ col("b").not_eq(lit(1)).and(col("b").eq(lit(2))),
+ vec![
+ // can only be true of b is not 1 and b is is 2 (even though
it is redundant)
+ not_in_guarantee("b", [1]),
+ in_guarantee("b", [2]),
+ ],
+ );
+ // b != 1 AND b != 2
+ test_analyze(
+ col("b").not_eq(lit(1)).and(col("b").not_eq(lit(2))),
+ vec![not_in_guarantee("b", [1, 2])],
+ );
+ // b != 1 AND b != 2 and b != 3
+ test_analyze(
+ col("b")
+ .not_eq(lit(1))
+ .and(col("b").not_eq(lit(2)))
+ .and(col("b").not_eq(lit(3))),
+ vec![not_in_guarantee("b", [1, 2, 3])],
+ );
+ // b != 1 AND b = 2 and b != 3. Can only be true if b is 2 and b is
not in (1, 3)
+ test_analyze(
+ col("b")
+ .not_eq(lit(1))
+ .and(col("b").eq(lit(2)))
+ .and(col("b").not_eq(lit(3))),
+ vec![not_in_guarantee("b", [1, 3]), in_guarantee("b", [2])],
+ );
+ // b != 1 AND b != 2 and b = 3 (in theory could determine b = 3)
+ test_analyze(
+ col("b")
+ .not_eq(lit(1))
+ .and(col("b").not_eq(lit(2)))
+ .and(col("b").eq(lit(3))),
+ vec![not_in_guarantee("b", [1, 2]), in_guarantee("b", [3])],
+ );
+ // b != 1 AND b != 2 and b > 3 (to be true, b can't be either 1 or 2
+ test_analyze(
+ col("b")
+ .not_eq(lit(1))
+ .and(col("b").not_eq(lit(2)))
+ .and(col("b").gt(lit(3))),
+ vec![not_in_guarantee("b", [1, 2])],
+ );
+ }
+
+ #[test]
+ fn test_conjunction_multi_column() {
+ // a = "foo" AND b = 1
+ test_analyze(
+ col("a").eq(lit("foo")).and(col("b").eq(lit(1))),
+ vec![
+ // should find both column guarantees
+ in_guarantee("a", ["foo"]),
+ in_guarantee("b", [1]),
+ ],
+ );
+ // a != "foo" AND b != 1
+ test_analyze(
+ col("a").not_eq(lit("foo")).and(col("b").not_eq(lit(1))),
+ // should find both column guarantees
+ vec![not_in_guarantee("a", ["foo"]), not_in_guarantee("b", [1])],
+ );
+ // a = "foo" AND a = "bar"
+ test_analyze(
+ col("a").eq(lit("foo")).and(col("a").eq(lit("bar"))),
+ // this predicate is impossible ( can't be both foo and bar),
+ vec![],
+ );
+ // a = "foo" AND b != "bar"
+ test_analyze(
+ col("a").eq(lit("foo")).and(col("a").not_eq(lit("bar"))),
+ vec![in_guarantee("a", ["foo"]), not_in_guarantee("a", ["bar"])],
+ );
+ // a != "foo" AND a != "bar"
+ test_analyze(
+ col("a").not_eq(lit("foo")).and(col("a").not_eq(lit("bar"))),
+ // know it isn't "foo" or "bar"
+ vec![not_in_guarantee("a", ["foo", "bar"])],
+ );
+ // a != "foo" AND a != "bar" and a != "baz"
+ test_analyze(
+ col("a")
+ .not_eq(lit("foo"))
+ .and(col("a").not_eq(lit("bar")))
+ .and(col("a").not_eq(lit("baz"))),
+ // know it isn't "foo" or "bar" or "baz"
+ vec![not_in_guarantee("a", ["foo", "bar", "baz"])],
+ );
+ // a = "foo" AND a = "foo"
+ let expr = col("a").eq(lit("foo"));
+ test_analyze(expr.clone().and(expr), vec![in_guarantee("a", ["foo"])]);
+ // b > 5 AND b = 10 (should get an b = 10 guarantee)
+ test_analyze(
+ col("b").gt(lit(5)).and(col("b").eq(lit(10))),
+ vec![in_guarantee("b", [10])],
+ );
+ // b > 10 AND b = 10 (this is impossible)
+ test_analyze(
+ col("b").gt(lit(10)).and(col("b").eq(lit(10))),
+ vec![
+ // if b isn't 10, it can not be true (though the expression
actually can never be true)
+ in_guarantee("b", [10]),
+ ],
+ );
+ // a != "foo" and (a != "bar" OR a != "baz")
+ test_analyze(
+ col("a")
+ .not_eq(lit("foo"))
+
.and(col("a").not_eq(lit("bar")).or(col("a").not_eq(lit("baz")))),
+ // a is not foo (we can't represent other knowledge about a)
+ vec![not_in_guarantee("a", ["foo"])],
+ );
+ }
+
+ #[test]
+ fn test_conjunction_and_disjunction_single_column() {
+ // b != 1 AND (b > 2)
+ test_analyze(
+ col("b").not_eq(lit(1)).and(col("b").gt(lit(2))),
+ vec![
+ // for the expression to be true, b can not be one
+ not_in_guarantee("b", [1]),
+ ],
+ );
+
+ // b = 1 AND (b = 2 OR b = 3). Could be simplified to false.
+ test_analyze(
+ col("b")
+ .eq(lit(1))
+ .and(col("b").eq(lit(2)).or(col("b").eq(lit(3)))),
+ vec![
+ // in theory, b must be 1 and one of 2,3 for this expression
to be true
+ // which is a logical contradiction
+ ],
+ );
+ }
+
+ #[test]
+ fn test_disjunction_single_column() {
+ // b = 1 OR b = 2
+ test_analyze(
+ col("b").eq(lit(1)).or(col("b").eq(lit(2))),
+ vec![in_guarantee("b", [1, 2])],
+ );
+ // b != 1 OR b = 2
+ test_analyze(col("b").not_eq(lit(1)).or(col("b").eq(lit(2))), vec![]);
+ // b = 1 OR b != 2
+ test_analyze(col("b").eq(lit(1)).or(col("b").not_eq(lit(2))), vec![]);
+ // b != 1 OR b != 2
+ test_analyze(col("b").not_eq(lit(1)).or(col("b").not_eq(lit(2))),
vec![]);
+ // b != 1 OR b != 2 OR b = 3 -- in theory could guarantee that b = 3
+ test_analyze(
+ col("b")
+ .not_eq(lit(1))
+ .or(col("b").not_eq(lit(2)))
+ .or(lit("b").eq(lit(3))),
+ vec![],
+ );
+ // b = 1 OR b = 2 OR b = 3
+ test_analyze(
+ col("b")
+ .eq(lit(1))
+ .or(col("b").eq(lit(2)))
+ .or(col("b").eq(lit(3))),
+ vec![in_guarantee("b", [1, 2, 3])],
+ );
+ // b = 1 OR b = 2 OR b > 3 -- can't guarantee that the expression is
only true if a is in (1, 2)
+ test_analyze(
+ col("b")
+ .eq(lit(1))
+ .or(col("b").eq(lit(2)))
+ .or(lit("b").eq(lit(3))),
+ vec![],
+ );
+ }
+
+ #[test]
+ fn test_disjunction_multi_column() {
+ // a = "foo" OR b = 1
+ test_analyze(
+ col("a").eq(lit("foo")).or(col("b").eq(lit(1))),
+ // no can't have a single column guarantee (if a = "foo" then b !=
1) etc
+ vec![],
+ );
+ // a != "foo" OR b != 1
+ test_analyze(
+ col("a").not_eq(lit("foo")).or(col("b").not_eq(lit(1))),
+ // No single column guarantee
+ vec![],
+ );
+ // a = "foo" OR a = "bar"
+ test_analyze(
+ col("a").eq(lit("foo")).or(col("a").eq(lit("bar"))),
+ vec![in_guarantee("a", ["foo", "bar"])],
+ );
+ // a = "foo" OR a = "foo"
+ test_analyze(
+ col("a").eq(lit("foo")).or(col("a").eq(lit("foo"))),
+ vec![in_guarantee("a", ["foo"])],
+ );
+ // a != "foo" OR a != "bar"
+ test_analyze(
+ col("a").not_eq(lit("foo")).or(col("a").not_eq(lit("bar"))),
+ // can't represent knowledge about a in this case
+ vec![],
+ );
+ // a = "foo" OR a = "bar" OR a = "baz"
+ test_analyze(
+ col("a")
+ .eq(lit("foo"))
+ .or(col("a").eq(lit("bar")))
+ .or(col("a").eq(lit("baz"))),
+ vec![in_guarantee("a", ["foo", "bar", "baz"])],
+ );
+ // (a = "foo" OR a = "bar") AND (a = "baz)"
+ test_analyze(
+ (col("a").eq(lit("foo")).or(col("a").eq(lit("bar"))))
+ .and(col("a").eq(lit("baz"))),
+ // this could potentially be represented as 2 constraints with a
more
+ // sophisticated analysis
+ vec![],
+ );
+ // (a = "foo" OR a = "bar") AND (b = 1)
+ test_analyze(
+ (col("a").eq(lit("foo")).or(col("a").eq(lit("bar"))))
+ .and(col("b").eq(lit(1))),
+ vec![in_guarantee("a", ["foo", "bar"]), in_guarantee("b", [1])],
+ );
+ // (a = "foo" OR a = "bar") OR (b = 1)
+ test_analyze(
+ col("a")
+ .eq(lit("foo"))
+ .or(col("a").eq(lit("bar")))
+ .or(col("b").eq(lit(1))),
+ // can't represent knowledge about a or b in this case
+ vec![],
+ );
+ }
+
+ // TODO https://github.com/apache/arrow-datafusion/issues/8436
+ // a IN (...)
+ // b NOT IN (...)
+
+ /// Tests that analyzing expr results in the expected guarantees
+ fn test_analyze(expr: Expr, expected: Vec<LiteralGuarantee>) {
+ println!("Begin analyze of {expr}");
+ let schema = schema();
+ let physical_expr = logical2physical(&expr, &schema);
+
+ let actual = LiteralGuarantee::analyze(&physical_expr);
+ assert_eq!(
+ expected, actual,
+ "expr: {expr}\
+ \n\nexpected: {expected:#?}\
+ \n\nactual: {actual:#?}\
+ \n\nexpr: {expr:#?}\
+ \n\nphysical_expr: {physical_expr:#?}"
+ );
+ }
+
+ /// Guarantee that the expression is true if the column is one of the
specified values
+ fn in_guarantee<'a, I, S>(column: &str, literals: I) -> LiteralGuarantee
+ where
+ I: IntoIterator<Item = S>,
+ S: Into<ScalarValue> + 'a,
+ {
+ let literals: Vec<_> = literals.into_iter().map(|s|
s.into()).collect();
+ LiteralGuarantee::try_new(column, Operator::Eq,
literals.iter()).unwrap()
+ }
+
+ /// Guarantee that the expression is true if the column is NOT any of the
specified values
+ fn not_in_guarantee<'a, I, S>(column: &str, literals: I) ->
LiteralGuarantee
+ where
+ I: IntoIterator<Item = S>,
+ S: Into<ScalarValue> + 'a,
+ {
+ let literals: Vec<_> = literals.into_iter().map(|s|
s.into()).collect();
+ LiteralGuarantee::try_new(column, Operator::NotEq,
literals.iter()).unwrap()
+ }
+
+ /// Convert a logical expression to a physical expression (without any
simplification, etc)
+ fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
+ let df_schema = schema.clone().to_dfschema().unwrap();
+ let execution_props = ExecutionProps::new();
+ create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
+ }
+
+ // Schema for testing
+ fn schema() -> SchemaRef {
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Int32, false),
+ ]))
+ })
+ .clone()
+ }
+
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+}
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils/mod.rs
similarity index 96%
rename from datafusion/physical-expr/src/utils.rs
rename to datafusion/physical-expr/src/utils/mod.rs
index 71a7ff5fb7..87ef36558b 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+mod guarantee;
+pub use guarantee::{Guarantee, LiteralGuarantee};
+
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -41,25 +44,29 @@ use petgraph::stable_graph::StableGraph;
pub fn split_conjunction(
predicate: &Arc<dyn PhysicalExpr>,
) -> Vec<&Arc<dyn PhysicalExpr>> {
- split_conjunction_impl(predicate, vec![])
+ split_impl(Operator::And, predicate, vec![])
}
-fn split_conjunction_impl<'a>(
+/// Assume the predicate is in the form of DNF, split the predicate to a Vec
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1
<= b2", "c1 != c2"]
+pub fn split_disjunction(
+ predicate: &Arc<dyn PhysicalExpr>,
+) -> Vec<&Arc<dyn PhysicalExpr>> {
+ split_impl(Operator::Or, predicate, vec![])
+}
+
+fn split_impl<'a>(
+ operator: Operator,
predicate: &'a Arc<dyn PhysicalExpr>,
mut exprs: Vec<&'a Arc<dyn PhysicalExpr>>,
) -> Vec<&'a Arc<dyn PhysicalExpr>> {
match predicate.as_any().downcast_ref::<BinaryExpr>() {
- Some(binary) => match binary.op() {
- Operator::And => {
- let exprs = split_conjunction_impl(binary.left(), exprs);
- split_conjunction_impl(binary.right(), exprs)
- }
- _ => {
- exprs.push(predicate);
- exprs
- }
- },
- None => {
+ Some(binary) if binary.op() == &operator => {
+ let exprs = split_impl(operator, binary.left(), exprs);
+ split_impl(operator, binary.right(), exprs)
+ }
+ Some(_) | None => {
exprs.push(predicate);
exprs
}