alamb commented on code in PR #7467:
URL: https://github.com/apache/arrow-datafusion/pull/7467#discussion_r1323513644


##########
datafusion/optimizer/src/simplify_expressions/guarantees.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Simplifier implementation for 
[ExprSimplifier::simplify_with_guarantees()][crate::simplify_expressions::expr_simplifier::ExprSimplifier::simplify_with_guarantees].
+use datafusion_common::{tree_node::TreeNodeRewriter, DataFusionError, Result};
+use datafusion_expr::{expr::InList, lit, Between, BinaryExpr, Expr, Operator};
+use std::collections::HashMap;
+
+use datafusion_physical_expr::intervals::{Interval, IntervalBound, 
NullableInterval};
+
+/// Rewrite expressions to incorporate guarantees.
+pub(crate) struct GuaranteeRewriter<'a> {
+    guarantees: HashMap<&'a Expr, &'a NullableInterval>,
+}
+
+impl<'a> GuaranteeRewriter<'a> {
+    pub fn new(
+        guarantees: impl IntoIterator<Item = &'a (Expr, NullableInterval)>,
+    ) -> Self {
+        Self {
+            guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(),
+        }
+    }
+}
+
+impl<'a> TreeNodeRewriter for GuaranteeRewriter<'a> {
+    type N = Expr;
+
+    fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+        if self.guarantees.is_empty() {
+            return Ok(expr);
+        }
+
+        match &expr {
+            Expr::IsNull(inner) => match self.guarantees.get(inner.as_ref()) {
+                Some(NullableInterval::Null { .. }) => Ok(lit(true)),
+                Some(NullableInterval::NotNull { .. }) => Ok(lit(false)),
+                _ => Ok(expr),
+            },
+            Expr::IsNotNull(inner) => match 
self.guarantees.get(inner.as_ref()) {
+                Some(NullableInterval::Null { .. }) => Ok(lit(false)),
+                Some(NullableInterval::NotNull { .. }) => Ok(lit(true)),
+                _ => Ok(expr),
+            },
+            Expr::Between(Between {
+                expr: inner,
+                negated,
+                low,
+                high,
+            }) => {
+                if let (Some(interval), Expr::Literal(low), 
Expr::Literal(high)) = (
+                    self.guarantees.get(inner.as_ref()),
+                    low.as_ref(),
+                    high.as_ref(),
+                ) {
+                    let expr_interval = NullableInterval::NotNull {
+                        values: Interval::new(
+                            IntervalBound::new(low.clone(), false),
+                            IntervalBound::new(high.clone(), false),
+                        ),
+                    };
+
+                    let contains = expr_interval.contains(*interval)?;
+
+                    if contains.is_certainly_true() {
+                        Ok(lit(!negated))
+                    } else if contains.is_certainly_false() {
+                        Ok(lit(*negated))
+                    } else {
+                        Ok(expr)
+                    }
+                } else {
+                    Ok(expr)
+                }
+            }
+
+            Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
+                // Check if this is a comparison
+                match op {
+                    Operator::Eq
+                    | Operator::NotEq
+                    | Operator::Lt
+                    | Operator::LtEq
+                    | Operator::Gt
+                    | Operator::GtEq
+                    | Operator::IsDistinctFrom
+                    | Operator::IsNotDistinctFrom => {}
+                    _ => return Ok(expr),
+                };
+
+                // Check if this is a comparison between a column and literal
+                let (col, op, value) = match (left.as_ref(), right.as_ref()) {
+                    (Expr::Column(_), Expr::Literal(value)) => (left, *op, 
value),
+                    (Expr::Literal(value), Expr::Column(_)) => {
+                        // If we can swap the op, we can simplify the 
expression
+                        if let Some(op) = op.swap() {
+                            (right, op, value)
+                        } else {
+                            return Ok(expr);
+                        }
+                    }
+                    _ => return Ok(expr),
+                };
+
+                if let Some(col_interval) = self.guarantees.get(col.as_ref()) {
+                    let result = col_interval.apply_operator(&op, 
&value.into())?;
+                    if result.is_certainly_true() {
+                        Ok(lit(true))
+                    } else if result.is_certainly_false() {
+                        Ok(lit(false))
+                    } else {
+                        Ok(expr)
+                    }
+                } else {
+                    Ok(expr)
+                }
+            }
+
+            // Columns (if interval is collapsed to a single value)
+            Expr::Column(_) => {
+                if let Some(col_interval) = self.guarantees.get(&expr) {
+                    if let Some(value) = col_interval.single_value() {
+                        Ok(lit(value))
+                    } else {
+                        Ok(expr)
+                    }
+                } else {
+                    Ok(expr)
+                }
+            }
+
+            Expr::InList(InList {
+                expr: inner,
+                list,
+                negated,
+            }) => {
+                if let Some(interval) = self.guarantees.get(inner.as_ref()) {
+                    // Can remove items from the list that don't match the 
guarantee
+                    let new_list: Vec<Expr> = list
+                        .iter()
+                        .filter_map(|expr| {
+                            if let Expr::Literal(item) = expr {
+                                match 
interval.contains(&NullableInterval::from(item)) {
+                                    // If we know for certain the value isn't 
in the column's interval,
+                                    // we can skip checking it.
+                                    Ok(NullableInterval::NotNull { values })
+                                        if values == Interval::CERTAINLY_FALSE 
=>
+                                    {
+                                        None
+                                    }
+                                    Err(err) => Some(Err(err)),
+                                    _ => Some(Ok(expr.clone())),
+                                }

Review Comment:
   Perhaps you can rewrite this using `is_certainly_false` like the following 
(not tested)
   
   ```suggestion
                                   if 
interval.contains(&NullableInterval::from(item))?
                                       // If we know for certain the value 
isn't in the column's interval,
                                       // we can skip checking it.
                                     .is_certainly_false() {
                                           None
                                       }
                                       else {
                                         Some(Ok(expr.clone())),
                                   }
   ```



##########
datafusion/optimizer/src/simplify_expressions/guarantees.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Simplifier implementation for 
[ExprSimplifier::simplify_with_guarantees()][crate::simplify_expressions::expr_simplifier::ExprSimplifier::simplify_with_guarantees].
+use datafusion_common::{tree_node::TreeNodeRewriter, DataFusionError, Result};
+use datafusion_expr::{expr::InList, lit, Between, BinaryExpr, Expr, Operator};
+use std::collections::HashMap;
+
+use datafusion_physical_expr::intervals::{Interval, IntervalBound, 
NullableInterval};
+
+/// Rewrite expressions to incorporate guarantees.
+pub(crate) struct GuaranteeRewriter<'a> {
+    guarantees: HashMap<&'a Expr, &'a NullableInterval>,
+}
+
+impl<'a> GuaranteeRewriter<'a> {
+    pub fn new(
+        guarantees: impl IntoIterator<Item = &'a (Expr, NullableInterval)>,
+    ) -> Self {
+        Self {
+            guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(),
+        }
+    }
+}
+
+impl<'a> TreeNodeRewriter for GuaranteeRewriter<'a> {
+    type N = Expr;
+
+    fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+        if self.guarantees.is_empty() {
+            return Ok(expr);
+        }
+
+        match &expr {
+            Expr::IsNull(inner) => match self.guarantees.get(inner.as_ref()) {
+                Some(NullableInterval::Null { .. }) => Ok(lit(true)),
+                Some(NullableInterval::NotNull { .. }) => Ok(lit(false)),
+                _ => Ok(expr),
+            },
+            Expr::IsNotNull(inner) => match 
self.guarantees.get(inner.as_ref()) {
+                Some(NullableInterval::Null { .. }) => Ok(lit(false)),
+                Some(NullableInterval::NotNull { .. }) => Ok(lit(true)),
+                _ => Ok(expr),
+            },
+            Expr::Between(Between {
+                expr: inner,
+                negated,
+                low,
+                high,
+            }) => {
+                if let (Some(interval), Expr::Literal(low), 
Expr::Literal(high)) = (
+                    self.guarantees.get(inner.as_ref()),
+                    low.as_ref(),
+                    high.as_ref(),
+                ) {
+                    let expr_interval = NullableInterval::NotNull {
+                        values: Interval::new(
+                            IntervalBound::new(low.clone(), false),
+                            IntervalBound::new(high.clone(), false),
+                        ),
+                    };
+
+                    let contains = expr_interval.contains(*interval)?;
+
+                    if contains.is_certainly_true() {

Review Comment:
   I really like how easy this is to read now



##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -686,6 +720,282 @@ fn calculate_cardinality_based_on_bounds(
     }
 }
 
+/// An [Interval] that also tracks null status using a boolean interval.
+///
+/// This represents values that may be in a particular range or be null.
+///
+/// # Examples
+///
+/// ```
+/// use datafusion_physical_expr::intervals::{Interval, NullableInterval};
+/// use datafusion_common::ScalarValue;
+///
+/// // [1, 2) U {NULL}
+/// NullableInterval::MaybeNull {
+///    values: Interval::make(Some(1), Some(2), (false, true)),
+/// };
+///
+/// // (0, ∞)
+/// NullableInterval::NotNull {
+///   values: Interval::make(Some(0), None, (true, true)),
+/// };
+///
+/// // {NULL}
+/// NullableInterval::Null;
+///
+/// // {4}
+/// NullableInterval::from(&ScalarValue::Int32(Some(4)));
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum NullableInterval {
+    /// The value is always null in this interval
+    ///
+    /// This is typed so it can be used in physical expressions, which don't do
+    /// type coercion.
+    Null { datatype: DataType },
+    /// The value may or may not be null in this interval. If it is non null 
its value is within
+    /// the specified values interval
+    MaybeNull { values: Interval },
+    /// The value is definitely not null in this interval and is within values
+    NotNull { values: Interval },
+}
+
+impl Default for NullableInterval {
+    fn default() -> Self {
+        NullableInterval::MaybeNull {
+            values: Interval::default(),
+        }
+    }
+}
+
+impl Display for NullableInterval {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::Null { .. } => write!(f, "NullableInterval: {{NULL}}"),
+            Self::MaybeNull { values } => {
+                write!(f, "NullableInterval: {} U {{NULL}}", values)
+            }
+            Self::NotNull { values } => write!(f, "NullableInterval: {}", 
values),
+        }
+    }
+}
+
+impl From<&ScalarValue> for NullableInterval {
+    /// Create an interval that represents a single value.
+    fn from(value: &ScalarValue) -> Self {
+        if value.is_null() {
+            Self::Null {
+                datatype: value.get_datatype(),
+            }
+        } else {
+            Self::NotNull {
+                values: Interval::new(
+                    IntervalBound::new(value.clone(), false),
+                    IntervalBound::new(value.clone(), false),
+                ),
+            }
+        }
+    }
+}
+
+impl NullableInterval {
+    /// Get the values interval, or None if this interval is definitely null.
+    pub fn values(&self) -> Option<&Interval> {
+        match self {
+            Self::Null { .. } => None,
+            Self::MaybeNull { values } | Self::NotNull { values } => 
Some(values),
+        }
+    }
+
+    /// Get the data type
+    pub fn get_datatype(&self) -> Result<DataType> {
+        match self {
+            Self::Null { datatype } => Ok(datatype.clone()),
+            Self::MaybeNull { values } | Self::NotNull { values } => {
+                values.get_datatype()
+            }
+        }
+    }
+
+    /// Return true if the value is definitely true (and not null).
+    pub fn is_certainly_true(&self) -> bool {
+        match self {
+            Self::Null { .. } | Self::MaybeNull { .. } => false,
+            Self::NotNull { values } => values == &Interval::CERTAINLY_TRUE,
+        }
+    }
+
+    /// Return true if the value is definitely false (and not null).
+    pub fn is_certainly_false(&self) -> bool {
+        match self {
+            Self::Null { .. } => false,
+            Self::MaybeNull { .. } => false,
+            Self::NotNull { values } => values == &Interval::CERTAINLY_FALSE,
+        }
+    }
+
+    /// Perform logical negation on a boolean nullable interval.
+    fn not(&self) -> Result<Self> {
+        match self {
+            Self::Null { datatype } => Ok(Self::Null {
+                datatype: datatype.clone(),
+            }),
+            Self::MaybeNull { values } => Ok(Self::MaybeNull {
+                values: values.not()?,
+            }),
+            Self::NotNull { values } => Ok(Self::NotNull {
+                values: values.not()?,
+            }),
+        }
+    }
+
+    /// Apply the given operator to this interval and the given interval.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use datafusion_common::ScalarValue;
+    /// use datafusion_expr::Operator;
+    /// use datafusion_physical_expr::intervals::{Interval, NullableInterval};
+    ///
+    /// // 4 > 3 -> true
+    /// let lhs = NullableInterval::from(&ScalarValue::Int32(Some(4)));
+    /// let rhs = NullableInterval::from(&ScalarValue::Int32(Some(3)));
+    /// let result = lhs.apply_operator(&Operator::Gt, &rhs).unwrap();
+    /// assert_eq!(result, 
NullableInterval::from(&ScalarValue::Boolean(Some(true))));
+    ///
+    /// // [1, 3) > NULL -> NULL
+    /// let lhs = NullableInterval::NotNull {
+    ///     values: Interval::make(Some(1), Some(3), (false, true)),
+    /// };
+    /// let rhs = NullableInterval::from(&ScalarValue::Int32(None));
+    /// let result = lhs.apply_operator(&Operator::Gt, &rhs).unwrap();
+    /// assert_eq!(result.single_value(), Some(ScalarValue::Null));
+    ///
+    /// // [1, 3] > [2, 4] -> [false, true]
+    /// let lhs = NullableInterval::NotNull {
+    ///     values: Interval::make(Some(1), Some(3), (false, false)),
+    /// };
+    /// let rhs = NullableInterval::NotNull {
+    ///    values: Interval::make(Some(2), Some(4), (false, false)),
+    /// };
+    /// let result = lhs.apply_operator(&Operator::Gt, &rhs).unwrap();
+    /// // Both inputs are valid (non-null), so result must be non-null
+    /// assert_eq!(result, NullableInterval::NotNull {
+    ///    // Uncertain whether inequality is true or false
+    ///    values: Interval::UNCERTAIN,
+    /// });
+    ///
+    /// ```
+    pub fn apply_operator(&self, op: &Operator, rhs: &Self) -> Result<Self> {
+        match op {
+            Operator::IsDistinctFrom => {
+                let values = match (self, rhs) {
+                    // NULL is distinct from NULL -> False
+                    (Self::Null { .. }, Self::Null { .. }) => 
Interval::CERTAINLY_FALSE,
+                    // x is distinct from y -> x != y,
+                    // if at least one of them is never null.
+                    (Self::NotNull { .. }, _) | (_, Self::NotNull { .. }) => {
+                        let lhs_values = self.values();
+                        let rhs_values = rhs.values();
+                        match (lhs_values, rhs_values) {
+                            (Some(lhs_values), Some(rhs_values)) => {
+                                lhs_values.equal(rhs_values).not()?
+                            }
+                            (Some(_), None) | (None, Some(_)) => 
Interval::CERTAINLY_TRUE,
+                            (None, None) => unreachable!("Null case handled 
above"),
+                        }
+                    }
+                    _ => Interval::UNCERTAIN,
+                };
+                // IsDistinctFrom never returns null.
+                Ok(Self::NotNull { values })
+            }
+            Operator::IsNotDistinctFrom => self
+                .apply_operator(&Operator::IsDistinctFrom, rhs)
+                .map(|i| i.not())?,
+            _ => {
+                if let (Some(left_values), Some(right_values)) =
+                    (self.values(), rhs.values())
+                {
+                    let values = apply_operator(op, left_values, 
right_values)?;
+                    match (self, rhs) {
+                        (Self::NotNull { .. }, Self::NotNull { .. }) => {
+                            Ok(Self::NotNull { values })
+                        }
+                        _ => Ok(Self::MaybeNull { values }),
+                    }
+                } else if op.is_comparison_operator() {
+                    Ok(Self::Null {
+                        datatype: DataType::Boolean,
+                    })
+                } else {
+                    Ok(Self::Null {
+                        datatype: self.get_datatype()?,
+                    })
+                }
+            }
+        }
+    }
+
+    /// Determine if this interval contains the given interval. Returns a 
boolean
+    /// interval that is [true, true] if this interval is a superset of the
+    /// given interval, [false, false] if this interval is disjoint from the
+    /// given interval, and [false, true] otherwise.
+    pub fn contains<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+        let rhs = other.borrow();
+        if let (Some(left_values), Some(right_values)) = (self.values(), 
rhs.values()) {
+            let values = left_values.contains(right_values)?;
+            match (self, rhs) {
+                (Self::NotNull { .. }, Self::NotNull { .. }) => {
+                    Ok(Self::NotNull { values })
+                }
+                _ => Ok(Self::MaybeNull { values }),
+            }
+        } else {
+            Ok(Self::Null {
+                datatype: DataType::Boolean,
+            })
+        }
+    }
+
+    /// If the interval has collapsed to a single value, return that value.
+    ///
+    /// Otherwise returns None.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use datafusion_common::ScalarValue;
+    /// use datafusion_physical_expr::intervals::{Interval, NullableInterval};
+    ///
+    /// let interval = NullableInterval::from(&ScalarValue::Int32(Some(4)));
+    /// assert_eq!(interval.single_value(), Some(ScalarValue::Int32(Some(4))));
+    ///
+    /// let interval = NullableInterval::from(&ScalarValue::Int32(None));
+    /// assert_eq!(interval.single_value(), Some(ScalarValue::Null));
+    ///
+    /// let interval = NullableInterval::MaybeNull {
+    ///     values: Interval::make(Some(1), Some(4), (false, true)),
+    /// };
+    /// assert_eq!(interval.single_value(), None);
+    /// ```
+    pub fn single_value(&self) -> Option<ScalarValue> {

Review Comment:
   You can avoid a clone (especially for ScalarValue::String) here with 
something like
   
   ```suggestion
       pub fn single_value(&self) -> Option<&ScalarValue> {
   ```
   
   



##########
datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs:
##########
@@ -149,6 +160,65 @@ impl<S: SimplifyInfo> ExprSimplifier<S> {
 
         expr.rewrite(&mut expr_rewrite)
     }
+
+    /// Input guarantees about the values of columns.
+    ///
+    /// The guarantees can simplify expressions. For example, if a column `x` 
is
+    /// guaranteed to be `3`, then the expression `x > 1` can be replaced by 
the
+    /// literal `true`.
+    ///
+    /// The guarantees are provided as a `Vec<(Expr, NullableInterval)>`,
+    /// where the [Expr] is a column reference and the [NullableInterval]
+    /// is an interval representing the known possible values of that column.
+    ///
+    /// ```rust
+    /// use arrow::datatypes::{DataType, Field, Schema};
+    /// use datafusion_expr::{col, lit, Expr};
+    /// use datafusion_common::{Result, ScalarValue, ToDFSchema};
+    /// use datafusion_physical_expr::execution_props::ExecutionProps;
+    /// use datafusion_physical_expr::intervals::{Interval, NullableInterval};
+    /// use datafusion_optimizer::simplify_expressions::{
+    ///     ExprSimplifier, SimplifyContext};
+    ///
+    /// let schema = Schema::new(vec![
+    ///   Field::new("x", DataType::Int64, false),
+    ///   Field::new("y", DataType::UInt32, false),
+    ///   Field::new("z", DataType::Int64, false),
+    ///   ])
+    ///   .to_dfschema_ref().unwrap();
+    ///
+    /// // Create the simplifier
+    /// let props = ExecutionProps::new();
+    /// let context = SimplifyContext::new(&props)
+    ///    .with_schema(schema);
+    ///
+    /// // Expression: (x >= 3) AND (y + 2 < 10) AND (z > 5)
+    /// let expr_x = col("x").gt_eq(lit(3_i64));
+    /// let expr_y = (col("y") + lit(2_u32)).lt(lit(10_u32));
+    /// let expr_z = col("z").gt(lit(5_i64));
+    /// let expr = expr_x.and(expr_y).and(expr_z.clone());
+    ///
+    /// let guarantees = vec![
+    ///    // x ∈ [3, 5]
+    ///    (
+    ///        col("x"),
+    ///        NullableInterval::NotNull {
+    ///            values: Interval::make(Some(3_i64), Some(5_i64), (false, 
false)),
+    ///        }
+    ///    ),
+    ///    // y = 3
+    ///    (col("y"), NullableInterval::from(&ScalarValue::UInt32(Some(3)))),
+    /// ];
+    /// let simplifier = 
ExprSimplifier::new(context).with_guarantees(guarantees);
+    /// let output = simplifier.simplify(expr).unwrap();
+    /// // Expression becomes: true AND true AND (z > 5), which simplifies to
+    /// // z > 5.
+    /// assert_eq!(output, expr_z);
+    /// ```
+    pub fn with_guarantees(mut self, guarantees: Vec<(Expr, 
NullableInterval)>) -> Self {
+        self.guarantees = guarantees;

Review Comment:
   👍 



##########
datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs:
##########
@@ -149,6 +160,65 @@ impl<S: SimplifyInfo> ExprSimplifier<S> {
 
         expr.rewrite(&mut expr_rewrite)
     }
+
+    /// Input guarantees about the values of columns.
+    ///
+    /// The guarantees can simplify expressions. For example, if a column `x` 
is
+    /// guaranteed to be `3`, then the expression `x > 1` can be replaced by 
the
+    /// literal `true`.
+    ///
+    /// The guarantees are provided as a `Vec<(Expr, NullableInterval)>`,
+    /// where the [Expr] is a column reference and the [NullableInterval]
+    /// is an interval representing the known possible values of that column.
+    ///
+    /// ```rust
+    /// use arrow::datatypes::{DataType, Field, Schema};
+    /// use datafusion_expr::{col, lit, Expr};
+    /// use datafusion_common::{Result, ScalarValue, ToDFSchema};
+    /// use datafusion_physical_expr::execution_props::ExecutionProps;
+    /// use datafusion_physical_expr::intervals::{Interval, NullableInterval};
+    /// use datafusion_optimizer::simplify_expressions::{
+    ///     ExprSimplifier, SimplifyContext};
+    ///
+    /// let schema = Schema::new(vec![
+    ///   Field::new("x", DataType::Int64, false),
+    ///   Field::new("y", DataType::UInt32, false),
+    ///   Field::new("z", DataType::Int64, false),
+    ///   ])
+    ///   .to_dfschema_ref().unwrap();
+    ///
+    /// // Create the simplifier
+    /// let props = ExecutionProps::new();
+    /// let context = SimplifyContext::new(&props)
+    ///    .with_schema(schema);
+    ///
+    /// // Expression: (x >= 3) AND (y + 2 < 10) AND (z > 5)
+    /// let expr_x = col("x").gt_eq(lit(3_i64));
+    /// let expr_y = (col("y") + lit(2_u32)).lt(lit(10_u32));
+    /// let expr_z = col("z").gt(lit(5_i64));
+    /// let expr = expr_x.and(expr_y).and(expr_z.clone());
+    ///
+    /// let guarantees = vec![
+    ///    // x ∈ [3, 5]
+    ///    (
+    ///        col("x"),
+    ///        NullableInterval::NotNull {
+    ///            values: Interval::make(Some(3_i64), Some(5_i64), (false, 
false)),
+    ///        }
+    ///    ),
+    ///    // y = 3
+    ///    (col("y"), NullableInterval::from(&ScalarValue::UInt32(Some(3)))),
+    /// ];
+    /// let simplifier = 
ExprSimplifier::new(context).with_guarantees(guarantees);
+    /// let output = simplifier.simplify(expr).unwrap();
+    /// // Expression becomes: true AND true AND (z > 5), which simplifies to
+    /// // z > 5.
+    /// assert_eq!(output, expr_z);
+    /// ```
+    pub fn with_guarantees(mut self, guarantees: Vec<(Expr, 
NullableInterval)>) -> Self {
+        self.guarantees = guarantees;

Review Comment:
   👍 



##########
datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs:
##########
@@ -39,13 +39,19 @@ use datafusion_expr::{
     and, expr, lit, or, BinaryExpr, BuiltinScalarFunction, Case, 
ColumnarValue, Expr,
     Like, Volatility,
 };
-use datafusion_physical_expr::{create_physical_expr, 
execution_props::ExecutionProps};
+use datafusion_physical_expr::{
+    create_physical_expr, execution_props::ExecutionProps, 
intervals::NullableInterval,
+};
 
 use crate::simplify_expressions::SimplifyInfo;
 
+use crate::simplify_expressions::guarantees::GuaranteeRewriter;
+
 /// This structure handles API for expression simplification
 pub struct ExprSimplifier<S> {
     info: S,
+    ///

Review Comment:
   this appears to be missing a comment



##########
datafusion/optimizer/src/simplify_expressions/guarantees.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Simplifier implementation for 
[ExprSimplifier::simplify_with_guarantees()][crate::simplify_expressions::expr_simplifier::ExprSimplifier::simplify_with_guarantees].
+use datafusion_common::{tree_node::TreeNodeRewriter, DataFusionError, Result};
+use datafusion_expr::{expr::InList, lit, Between, BinaryExpr, Expr, Operator};
+use std::collections::HashMap;
+
+use datafusion_physical_expr::intervals::{Interval, IntervalBound, 
NullableInterval};
+
+/// Rewrite expressions to incorporate guarantees.
+pub(crate) struct GuaranteeRewriter<'a> {
+    guarantees: HashMap<&'a Expr, &'a NullableInterval>,
+}
+
+impl<'a> GuaranteeRewriter<'a> {
+    pub fn new(
+        guarantees: impl IntoIterator<Item = &'a (Expr, NullableInterval)>,
+    ) -> Self {
+        Self {
+            guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(),
+        }
+    }
+}
+
+impl<'a> TreeNodeRewriter for GuaranteeRewriter<'a> {
+    type N = Expr;
+
+    fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+        if self.guarantees.is_empty() {
+            return Ok(expr);
+        }
+
+        match &expr {
+            Expr::IsNull(inner) => match self.guarantees.get(inner.as_ref()) {
+                Some(NullableInterval::Null { .. }) => Ok(lit(true)),
+                Some(NullableInterval::NotNull { .. }) => Ok(lit(false)),
+                _ => Ok(expr),
+            },
+            Expr::IsNotNull(inner) => match 
self.guarantees.get(inner.as_ref()) {
+                Some(NullableInterval::Null { .. }) => Ok(lit(false)),
+                Some(NullableInterval::NotNull { .. }) => Ok(lit(true)),
+                _ => Ok(expr),
+            },
+            Expr::Between(Between {
+                expr: inner,
+                negated,
+                low,
+                high,
+            }) => {
+                if let (Some(interval), Expr::Literal(low), 
Expr::Literal(high)) = (
+                    self.guarantees.get(inner.as_ref()),
+                    low.as_ref(),
+                    high.as_ref(),
+                ) {
+                    let expr_interval = NullableInterval::NotNull {
+                        values: Interval::new(
+                            IntervalBound::new(low.clone(), false),
+                            IntervalBound::new(high.clone(), false),
+                        ),
+                    };
+
+                    let contains = expr_interval.contains(*interval)?;
+
+                    if contains.is_certainly_true() {
+                        Ok(lit(!negated))
+                    } else if contains.is_certainly_false() {
+                        Ok(lit(*negated))
+                    } else {
+                        Ok(expr)
+                    }
+                } else {
+                    Ok(expr)
+                }
+            }
+
+            Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
+                // Check if this is a comparison
+                match op {
+                    Operator::Eq
+                    | Operator::NotEq
+                    | Operator::Lt
+                    | Operator::LtEq
+                    | Operator::Gt
+                    | Operator::GtEq
+                    | Operator::IsDistinctFrom
+                    | Operator::IsNotDistinctFrom => {}
+                    _ => return Ok(expr),
+                };
+
+                // Check if this is a comparison between a column and literal
+                let (col, op, value) = match (left.as_ref(), right.as_ref()) {
+                    (Expr::Column(_), Expr::Literal(value)) => (left, *op, 
value),
+                    (Expr::Literal(value), Expr::Column(_)) => {
+                        // If we can swap the op, we can simplify the 
expression
+                        if let Some(op) = op.swap() {
+                            (right, op, value)
+                        } else {
+                            return Ok(expr);
+                        }
+                    }
+                    _ => return Ok(expr),
+                };
+
+                if let Some(col_interval) = self.guarantees.get(col.as_ref()) {
+                    let result = col_interval.apply_operator(&op, 
&value.into())?;
+                    if result.is_certainly_true() {
+                        Ok(lit(true))
+                    } else if result.is_certainly_false() {
+                        Ok(lit(false))
+                    } else {
+                        Ok(expr)
+                    }
+                } else {
+                    Ok(expr)
+                }
+            }
+
+            // Columns (if interval is collapsed to a single value)
+            Expr::Column(_) => {
+                if let Some(col_interval) = self.guarantees.get(&expr) {
+                    if let Some(value) = col_interval.single_value() {
+                        Ok(lit(value))
+                    } else {
+                        Ok(expr)
+                    }
+                } else {
+                    Ok(expr)
+                }
+            }
+
+            Expr::InList(InList {
+                expr: inner,
+                list,
+                negated,
+            }) => {
+                if let Some(interval) = self.guarantees.get(inner.as_ref()) {
+                    // Can remove items from the list that don't match the 
guarantee
+                    let new_list: Vec<Expr> = list
+                        .iter()
+                        .filter_map(|expr| {
+                            if let Expr::Literal(item) = expr {
+                                match 
interval.contains(&NullableInterval::from(item)) {
+                                    // If we know for certain the value isn't 
in the column's interval,
+                                    // we can skip checking it.
+                                    Ok(NullableInterval::NotNull { values })
+                                        if values == Interval::CERTAINLY_FALSE 
=>
+                                    {
+                                        None
+                                    }
+                                    Err(err) => Some(Err(err)),
+                                    _ => Some(Ok(expr.clone())),
+                                }
+                            } else {
+                                Some(Ok(expr.clone()))
+                            }
+                        })
+                        .collect::<Result<_, DataFusionError>>()?;
+
+                    Ok(Expr::InList(InList {
+                        expr: inner.clone(),
+                        list: new_list,
+                        negated: *negated,
+                    }))
+                } else {
+                    Ok(expr)
+                }
+            }
+
+            _ => Ok(expr),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use arrow::datatypes::DataType;
+    use datafusion_common::{tree_node::TreeNode, ScalarValue};
+    use datafusion_expr::{col, lit};
+
+    #[test]
+    fn test_null_handling() {
+        // IsNull / IsNotNull can be rewritten to true / false
+        let guarantees = vec![
+            // Note: AlwaysNull case handled by test_column_single_value test,
+            // since it's a special case of a column with a single value.
+            (
+                col("x"),
+                NullableInterval::NotNull {
+                    values: Default::default(),
+                },
+            ),
+        ];
+        let mut rewriter = GuaranteeRewriter::new(guarantees.iter());
+
+        // x IS NULL => guaranteed false
+        let expr = col("x").is_null();
+        let output = expr.clone().rewrite(&mut rewriter).unwrap();
+        assert_eq!(output, lit(false));
+
+        // x IS NOT NULL => guaranteed true
+        let expr = col("x").is_not_null();
+        let output = expr.clone().rewrite(&mut rewriter).unwrap();
+        assert_eq!(output, lit(true));
+    }
+
+    fn validate_simplified_cases<T>(rewriter: &mut GuaranteeRewriter, cases: 
&[(Expr, T)])
+    where
+        ScalarValue: From<T>,
+        T: Clone,
+    {
+        for (expr, expected_value) in cases {
+            let output = expr.clone().rewrite(rewriter).unwrap();
+            let expected = lit(ScalarValue::from(expected_value.clone()));
+            assert_eq!(
+                output, expected,
+                "{} simplified to {}, but expected {}",
+                expr, output, expected
+            );
+        }
+    }
+
+    fn validate_unchanged_cases(rewriter: &mut GuaranteeRewriter, cases: 
&[Expr]) {
+        for expr in cases {
+            let output = expr.clone().rewrite(rewriter).unwrap();
+            assert_eq!(
+                &output, expr,
+                "{} was simplified to {}, but expected it to be unchanged",
+                expr, output
+            );
+        }
+    }
+
+    #[test]
+    fn test_inequalities_non_null_bounded() {
+        let guarantees = vec![
+            // x ∈ (1, 3] (not null)
+            (
+                col("x"),
+                NullableInterval::NotNull {
+                    values: Interval::make(Some(1_i32), Some(3_i32), (true, 
false)),
+                },
+            ),
+        ];
+
+        let mut rewriter = GuaranteeRewriter::new(guarantees.iter());
+
+        // (original_expr, expected_simplification)
+        let simplified_cases = &[
+            (col("x").lt_eq(lit(1)), false),
+            (col("x").lt_eq(lit(3)), true),
+            (col("x").gt(lit(3)), false),
+            (col("x").gt(lit(1)), true),
+            (col("x").eq(lit(0)), false),
+            (col("x").not_eq(lit(0)), true),
+            (col("x").between(lit(2), lit(5)), true),
+            (col("x").between(lit(5), lit(10)), false),

Review Comment:
   Shall we test between (2, 3) as well to check the boundary condition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to