waynexia commented on code in PR #8437:
URL: https://github.com/apache/arrow-datafusion/pull/8437#discussion_r1423675616


##########
datafusion/physical-expr/src/utils/guarantee.rs:
##########
@@ -0,0 +1,518 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`LiteralGuarantee`] predicate analysis to determine if a column is a
+//! constant.
+
+use crate::utils::split_disjunction;
+use crate::{split_conjunction, PhysicalExpr};
+use datafusion_common::{Column, ScalarValue};
+use datafusion_expr::Operator;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+/// Represents a known guarantee that a column is either one of a set of values
+/// or not one of a set of values.
+///
+/// `LiteralGuarantee`s can be used to simplify expressions and skip data files
+/// (or row groups in parquet files). For example, if we know that `a = 1` then
+/// we can skip any partition that does not contain `a = 1`.
+///
+/// See [`LiteralGuarantee::analyze`] to extract literal guarantees from a
+/// predicate.
+///
+/// # Details
+/// A guarantee can be one of two forms:
+///
+/// 1. One of particular set of values. For example, `(a = 1)`, `(a = 1 OR a =
+/// 2) or `a IN (1, 2, 3)`
+///
+/// 2. NOT one of a particular set of values. For example, `(a != 1)`, `(a != 1
+/// AND a != 2)` or `a NOT IN (1, 2, 3)`
+///
+#[derive(Debug, Clone, PartialEq)]
+pub struct LiteralGuarantee {
+    pub column: Column,
+    pub guarantee: Guarantee,
+    pub literals: HashSet<ScalarValue>,
+}
+
+/// What is be guaranteed about the values?
+#[derive(Debug, Clone, PartialEq)]
+pub enum Guarantee {
+    /// `column` is one of a set of constant values
+    In,
+    /// `column` is NOT one of a set of constant values
+    NotIn,
+}
+
+impl LiteralGuarantee {
+    /// Create a new instance of the guarantee if the provided operator is
+    /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to
+    /// create these structures from an expression.
+    fn try_new<'a>(
+        column_name: impl Into<String>,
+        op: &Operator,
+        literals: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Option<Self> {
+        let guarantee = match op {
+            Operator::Eq => Guarantee::In,
+            Operator::NotEq => Guarantee::NotIn,
+            _ => return None,
+        };
+
+        let literals: HashSet<_> = literals.into_iter().cloned().collect();
+
+        Some(Self {
+            column: Column::from_name(column_name),
+            guarantee,
+            literals,
+        })
+    }
+
+    /// Return a list of `LiteralGuarantees` for the specified predicate that
+    /// hold if the predicate (filter) expression evaluates to `true`.
+    ///
+    /// `expr` should be a boolean expression
+    ///
+    /// Note: this function does not simplify the expression prior to analysis.
+    pub fn analyze(expr: &Arc<dyn PhysicalExpr>) -> Vec<LiteralGuarantee> {
+        split_conjunction(expr)
+            .into_iter()
+            .fold(GuaranteeBuilder::new(), |builder, expr| {
+                if let Some(cel) = ColOpLit::try_new(expr) {
+                    return builder.aggregate_conjunct(cel);
+                } else {
+                    // look for (col <op> literal) OR (col <op> literal) ...
+                    let disjunctions = split_disjunction(expr);
+
+                    let terms = disjunctions
+                        .iter()
+                        .filter_map(|expr| ColOpLit::try_new(expr))
+                        .collect::<Vec<_>>();
+
+                    if terms.is_empty() {
+                        return builder;
+                    }
+
+                    // if not all terms are of the form (col <op> literal),
+                    // can't infer anything
+                    if terms.len() != disjunctions.len() {
+                        return builder;
+                    }

Review Comment:
   Why is this check necessary? We can infer the guarantee for expr like below, 
 as dropping a part of OR-chain the condition still holds
   ```
   ... AND (
     a != 1 OR
     b > 1 OR
     a != 2
   )
   ```



##########
datafusion/physical-expr/src/utils/guarantee.rs:
##########
@@ -0,0 +1,518 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`LiteralGuarantee`] predicate analysis to determine if a column is a
+//! constant.
+
+use crate::utils::split_disjunction;
+use crate::{split_conjunction, PhysicalExpr};
+use datafusion_common::{Column, ScalarValue};
+use datafusion_expr::Operator;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+/// Represents a known guarantee that a column is either one of a set of values
+/// or not one of a set of values.
+///
+/// `LiteralGuarantee`s can be used to simplify expressions and skip data files
+/// (or row groups in parquet files). For example, if we know that `a = 1` then
+/// we can skip any partition that does not contain `a = 1`.
+///
+/// See [`LiteralGuarantee::analyze`] to extract literal guarantees from a
+/// predicate.
+///
+/// # Details
+/// A guarantee can be one of two forms:
+///
+/// 1. One of particular set of values. For example, `(a = 1)`, `(a = 1 OR a =
+/// 2) or `a IN (1, 2, 3)`
+///
+/// 2. NOT one of a particular set of values. For example, `(a != 1)`, `(a != 1
+/// AND a != 2)` or `a NOT IN (1, 2, 3)`
+///
+#[derive(Debug, Clone, PartialEq)]
+pub struct LiteralGuarantee {
+    pub column: Column,
+    pub guarantee: Guarantee,
+    pub literals: HashSet<ScalarValue>,
+}
+
+/// What is be guaranteed about the values?
+#[derive(Debug, Clone, PartialEq)]
+pub enum Guarantee {
+    /// `column` is one of a set of constant values
+    In,
+    /// `column` is NOT one of a set of constant values
+    NotIn,
+}
+
+impl LiteralGuarantee {
+    /// Create a new instance of the guarantee if the provided operator is
+    /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to
+    /// create these structures from an expression.
+    fn try_new<'a>(
+        column_name: impl Into<String>,
+        op: &Operator,
+        literals: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Option<Self> {
+        let guarantee = match op {
+            Operator::Eq => Guarantee::In,
+            Operator::NotEq => Guarantee::NotIn,
+            _ => return None,
+        };
+
+        let literals: HashSet<_> = literals.into_iter().cloned().collect();
+
+        Some(Self {
+            column: Column::from_name(column_name),
+            guarantee,
+            literals,
+        })
+    }
+
+    /// Return a list of `LiteralGuarantees` for the specified predicate that
+    /// hold if the predicate (filter) expression evaluates to `true`.
+    ///
+    /// `expr` should be a boolean expression
+    ///
+    /// Note: this function does not simplify the expression prior to analysis.
+    pub fn analyze(expr: &Arc<dyn PhysicalExpr>) -> Vec<LiteralGuarantee> {
+        split_conjunction(expr)
+            .into_iter()
+            .fold(GuaranteeBuilder::new(), |builder, expr| {
+                if let Some(cel) = ColOpLit::try_new(expr) {
+                    return builder.aggregate_conjunct(cel);
+                } else {
+                    // look for (col <op> literal) OR (col <op> literal) ...
+                    let disjunctions = split_disjunction(expr);
+
+                    let terms = disjunctions
+                        .iter()
+                        .filter_map(|expr| ColOpLit::try_new(expr))
+                        .collect::<Vec<_>>();
+
+                    if terms.is_empty() {
+                        return builder;
+                    }
+
+                    // if not all terms are of the form (col <op> literal),
+                    // can't infer anything
+                    if terms.len() != disjunctions.len() {
+                        return builder;
+                    }
+
+                    // if all terms are 'col <op> literal' with the same column
+                    // and operation we can add a guarantee
+                    let first_term = &terms[0];
+                    if terms.iter().all(|term| {
+                        term.col.name() == first_term.col.name()
+                            && term.op == first_term.op
+                    }) {
+                        builder.aggregate_multi_conjunct(
+                            first_term.col,
+                            first_term.op,
+                            terms.iter().map(|term| term.lit.value()),
+                        )
+                    } else {
+                        // can't infer anything
+                        builder
+                    }
+                }
+            })
+            .build()
+    }
+}
+
+/// Combines conjuncts (aka terms `AND`ed together) into [`LiteralGuarantee`]s,
+/// preserving insert order
+#[derive(Debug, Default)]
+struct GuaranteeBuilder<'a> {
+    /// List of guarantees that have been created so far
+    /// if we have determined a subsequent conjunct invalidates a guarantee
+    /// e.g. `a = foo AND a = bar` then the relevant guarantee will be None
+    guarantees: Vec<Option<LiteralGuarantee>>,
+
+    /// Key is the (column name, operator type)
+    /// Value is the index into `guarantees`
+    map: HashMap<(&'a crate::expressions::Column, &'a Operator), usize>,
+}
+
+impl<'a> GuaranteeBuilder<'a> {
+    fn new() -> Self {
+        Default::default()
+    }
+
+    /// Aggregate a new single `AND col <op> literal` term to this builder
+    /// combining with existing guarantees if possible.
+    ///
+    /// # Examples
+    /// * `AND (a = 1)`: `a` is guaranteed to be 1
+    /// * `AND (a != 1)`: a is guaranteed to not be 1
+    fn aggregate_conjunct(self, col_op_lit: ColOpLit<'a>) -> Self {
+        self.aggregate_multi_conjunct(
+            col_op_lit.col,
+            col_op_lit.op,
+            [col_op_lit.lit.value()],
+        )
+    }
+
+    /// Aggregates a new single multi column term to ths builder
+    /// combining with existing guarantees if possible.
+    ///
+    /// # Examples
+    /// * (`AND (a = 1 OR a = 2 OR a = 3)`): a is guaranteed to be 1, 2, or 3
+    /// * `AND (a IN (1,2,3))`: a is guaranteed to be 1, 2, or 3
+    /// * `AND (a != 1 OR a != 2 OR a != 3)`: a is guaranteed to not be 1, 2, 
or 3
+    /// * `AND (a NOT IN (1,2,3))`: a is guaranteed to not be 1, 2, or 3
+    fn aggregate_multi_conjunct(
+        mut self,
+        col: &'a crate::expressions::Column,
+        op: &'a Operator,
+        new_values: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Self {
+        let key = (col, op);
+        if let Some(index) = self.map.get(&key) {
+            // already have a guarantee for this column
+            let entry = &mut self.guarantees[*index];
+
+            let Some(existing) = entry else {
+                // guarantee has been previously invalidated, nothing to do
+                return self;
+            };
+
+            // can only combine conjuncts if we have `a != foo AND a != bar`.
+            // `a = foo AND a = bar` is not correct. Also, can't extend with 
more than one value.
+            match existing.guarantee {
+                Guarantee::NotIn => {
+                    // can extend if only single literal, otherwise invalidate
+                    let new_values: HashSet<_> = 
new_values.into_iter().collect();
+                    if new_values.len() == 1 {
+                        
existing.literals.extend(new_values.into_iter().cloned())
+                    } else {
+                        // this is like (a != foo AND (a != bar OR a != baz)).
+                        // We can't combine the (a!=bar OR a!=baz) part, but it
+                        // also doesn't invalidate a != foo guarantee.
+                    }
+                }
+                Guarantee::In => {
+                    // for an IN guarantee, it is ok if the value is the same
+                    // e.g. `a = foo AND a = foo` but not if the value is 
different
+                    // e.g. `a = foo AND a = bar`
+                    if new_values
+                        .into_iter()
+                        .all(|new_value| existing.literals.contains(new_value))
+                    {
+                        // all values are already in the set
+                    } else {
+                        // at least one was not, so invalidate the guarantee
+                        *entry = None;
+                    }
+                }
+            }
+        } else {
+            // This is a new guarantee
+            let new_values: HashSet<_> = new_values.into_iter().collect();
+
+            // new_values are combined with OR, so we can only create a
+            // multi-column guarantee for `=` (or a single value).
+            // (e.g. ignore `a != foo OR a != bar`)
+            if op == &Operator::Eq || new_values.len() == 1 {
+                if let Some(guarantee) =
+                    LiteralGuarantee::try_new(col.name(), op, new_values)
+                {
+                    // add it to the list of guarantees
+                    self.guarantees.push(Some(guarantee));
+                    self.map.insert(key, self.guarantees.len() - 1);
+                }
+            }
+        }
+
+        self
+    }
+
+    /// Return all guarantees that have been created so far
+    fn build(self) -> Vec<LiteralGuarantee> {
+        // filter out any guarantees that have been invalidated
+        self.guarantees.into_iter().flatten().collect()
+    }
+}
+
+/// Represents a single `col <op> literal` expression
+struct ColOpLit<'a> {
+    col: &'a crate::expressions::Column,
+    op: &'a Operator,
+    lit: &'a crate::expressions::Literal,
+}
+
+impl<'a> ColOpLit<'a> {
+    /// Returns Some(ColEqLit) if the expression is either:
+    /// 1. `col <op> literal`
+    /// 2. `literal <op> col`
+    ///
+    /// Returns None otherwise
+    fn try_new(expr: &'a Arc<dyn PhysicalExpr>) -> Option<Self> {
+        let binary_expr = expr
+            .as_any()
+            .downcast_ref::<crate::expressions::BinaryExpr>()?;
+
+        let (left, op, right) = (
+            binary_expr.left().as_any(),
+            binary_expr.op(),
+            binary_expr.right().as_any(),
+        );
+
+        // col <op> literal
+        if let (Some(col), Some(lit)) = (
+            left.downcast_ref::<crate::expressions::Column>(),
+            right.downcast_ref::<crate::expressions::Literal>(),
+        ) {
+            Some(Self { col, op, lit })

Review Comment:
   Should we call `Operator::swap()` as the operator is reversed? Although only 
`Eq` and `NotEq` will occur here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to