This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f4442e0fb Interval Arithmetic NegativeExpr Support (#7804)
1f4442e0fb is described below

commit 1f4442e0fb427b06a78854d39b07d68743b94d2a
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Oct 12 18:34:51 2023 +0300

    Interval Arithmetic NegativeExpr Support (#7804)
    
    * Intervals can propagate over NegativeExpr's.
    
    * Addressing doc string reviews
---
 datafusion/common/src/scalar.rs                    |  3 +-
 .../core/src/physical_optimizer/join_selection.rs  | 13 ++--
 .../src/physical_optimizer/pipeline_checker.rs     |  2 +-
 .../physical-expr/src/expressions/negative.rs      | 73 +++++++++++++++++++---
 .../src/intervals/interval_aritmetic.rs            | 20 ++++--
 datafusion/physical-expr/src/intervals/utils.rs    | 39 ++++++++----
 datafusion/physical-plan/src/filter.rs             |  2 +-
 7 files changed, 120 insertions(+), 32 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index ae9c29b5a1..242d784edc 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1004,7 +1004,8 @@ impl ScalarValue {
             | ScalarValue::Int16(None)
             | ScalarValue::Int32(None)
             | ScalarValue::Int64(None)
-            | ScalarValue::Float32(None) => Ok(self.clone()),
+            | ScalarValue::Float32(None)
+            | ScalarValue::Float64(None) => Ok(self.clone()),
             ScalarValue::Float64(Some(v)) => 
Ok(ScalarValue::Float64(Some(-v))),
             ScalarValue::Float32(Some(v)) => 
Ok(ScalarValue::Float32(Some(-v))),
             ScalarValue::Int8(Some(v)) => Ok(ScalarValue::Int8(Some(-v))),
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 4cff4a8f6c..3d5c93d4b3 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -1173,6 +1173,7 @@ mod tests_statistical {
 
 #[cfg(test)]
 mod util_tests {
+    use arrow_schema::{DataType, Field, Schema};
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{BinaryExpr, Column, 
NegativeExpr};
     use datafusion_physical_expr::intervals::utils::check_support;
@@ -1181,26 +1182,30 @@ mod util_tests {
 
     #[test]
     fn check_expr_supported() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]));
         let supported_expr = Arc::new(BinaryExpr::new(
             Arc::new(Column::new("a", 0)),
             Operator::Plus,
             Arc::new(Column::new("a", 0)),
         )) as Arc<dyn PhysicalExpr>;
-        assert!(check_support(&supported_expr));
+        assert!(check_support(&supported_expr, &schema));
         let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn 
PhysicalExpr>;
-        assert!(check_support(&supported_expr_2));
+        assert!(check_support(&supported_expr_2, &schema));
         let unsupported_expr = Arc::new(BinaryExpr::new(
             Arc::new(Column::new("a", 0)),
             Operator::Or,
             Arc::new(Column::new("a", 0)),
         )) as Arc<dyn PhysicalExpr>;
-        assert!(!check_support(&unsupported_expr));
+        assert!(!check_support(&unsupported_expr, &schema));
         let unsupported_expr_2 = Arc::new(BinaryExpr::new(
             Arc::new(Column::new("a", 0)),
             Operator::Or,
             Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
         )) as Arc<dyn PhysicalExpr>;
-        assert!(!check_support(&unsupported_expr_2));
+        assert!(!check_support(&unsupported_expr_2, &schema));
     }
 }
 
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs 
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 3b994d5f89..fbc9910179 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -163,7 +163,7 @@ pub fn check_finiteness_requirements(
 /// [`Operator`]: datafusion_expr::Operator
 fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
     join.filter().map_or(false, |filter| {
-        check_support(filter.expression())
+        check_support(filter.expression(), &join.schema())
             && filter
                 .schema()
                 .fields()
diff --git a/datafusion/physical-expr/src/expressions/negative.rs 
b/datafusion/physical-expr/src/expressions/negative.rs
index 90430cb2bb..c66c91ef31 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -17,26 +17,25 @@
 
 //! Negation (-) expression
 
-use std::any::Any;
-use std::hash::{Hash, Hasher};
-use std::sync::Arc;
-
+use crate::intervals::Interval;
 use crate::physical_expr::down_cast_any_ref;
 use crate::sort_properties::SortProperties;
 use crate::PhysicalExpr;
-
 use arrow::{
     compute::kernels::numeric::neg_wrapping,
     datatypes::{DataType, Schema},
     record_batch::RecordBatch,
 };
-
 use datafusion_common::{internal_err, DataFusionError, Result};
 use datafusion_expr::{
     type_coercion::{is_interval, is_null, is_signed_numeric},
     ColumnarValue,
 };
 
+use std::any::Any;
+use std::hash::{Hash, Hasher};
+use std::sync::Arc;
+
 /// Negative expression
 #[derive(Debug, Hash)]
 pub struct NegativeExpr {
@@ -105,6 +104,30 @@ impl PhysicalExpr for NegativeExpr {
         self.hash(&mut s);
     }
 
+    /// Given the child interval of a NegativeExpr, it calculates the 
NegativeExpr's interval.
+    /// It replaces the upper and lower bounds after multiplying them with -1.
+    /// Ex: `(a, b] => [-b, -a)``
+    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
+        Ok(Interval::new(
+            children[0].upper.negate()?,
+            children[0].lower.negate()?,
+        ))
+    }
+
+    /// Returns a new [`Interval`] of a NegativeExpr  that has the existing 
`interval` given that
+    /// given the input interval is known to be `children`.
+    fn propagate_constraints(
+        &self,
+        interval: &Interval,
+        children: &[&Interval],
+    ) -> Result<Vec<Option<Interval>>> {
+        let child_interval = children[0];
+        let negated_interval =
+            Interval::new(interval.upper.negate()?, interval.lower.negate()?);
+
+        Ok(vec![child_interval.intersect(negated_interval)?])
+    }
+
     /// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
     fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
         -children[0]
@@ -144,7 +167,10 @@ pub fn negative(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::col;
+    use crate::{
+        expressions::{col, Column},
+        intervals::Interval,
+    };
     #[allow(unused_imports)]
     use arrow::array::*;
     use arrow::datatypes::*;
@@ -187,4 +213,37 @@ mod tests {
         test_array_negative_op!(Float64, 23456.0f64, 12345.0f64);
         Ok(())
     }
+
+    #[test]
+    fn test_evaluate_bounds() -> Result<()> {
+        let negative_expr = NegativeExpr {
+            arg: Arc::new(Column::new("a", 0)),
+        };
+        let child_interval = Interval::make(Some(-2), Some(1), (true, false));
+        let negative_expr_interval = Interval::make(Some(-1), Some(2), (false, 
true));
+        assert_eq!(
+            negative_expr.evaluate_bounds(&[&child_interval])?,
+            negative_expr_interval
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_propagate_constraints() -> Result<()> {
+        let negative_expr = NegativeExpr {
+            arg: Arc::new(Column::new("a", 0)),
+        };
+        let original_child_interval = Interval::make(Some(-2), Some(3), 
(false, false));
+        let negative_expr_interval = Interval::make(Some(0), Some(4), (true, 
false));
+        let after_propagation =
+            vec![Some(Interval::make(Some(-2), Some(0), (false, true)))];
+        assert_eq!(
+            negative_expr.propagate_constraints(
+                &negative_expr_interval,
+                &[&original_child_interval]
+            )?,
+            after_propagation
+        );
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs 
b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
index 210db0ebe6..277124ea22 100644
--- a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
+++ b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
@@ -17,14 +17,8 @@
 
 //! Interval arithmetic library
 
-use std::borrow::Borrow;
-use std::fmt;
-use std::fmt::{Display, Formatter};
-use std::ops::{AddAssign, SubAssign};
-
 use crate::aggregate::min_max::{max, min};
 use crate::intervals::rounding::{alter_fp_rounding_mode, next_down, next_up};
-
 use arrow::compute::{cast_with_options, CastOptions};
 use arrow::datatypes::DataType;
 use arrow_array::ArrowNativeTypeOp;
@@ -32,6 +26,11 @@ use datafusion_common::{exec_err, internal_err, 
DataFusionError, Result, ScalarV
 use datafusion_expr::type_coercion::binary::get_result_type;
 use datafusion_expr::Operator;
 
+use std::borrow::Borrow;
+use std::fmt;
+use std::fmt::{Display, Formatter};
+use std::ops::{AddAssign, SubAssign};
+
 /// This type represents a single endpoint of an [`Interval`]. An
 /// endpoint can be open (does not include the endpoint) or closed
 /// (includes the endpoint).
@@ -87,6 +86,15 @@ impl IntervalBound {
             .map(|value| IntervalBound::new(value, self.open))
     }
 
+    /// Returns a new bound with a negated value, if any, and the same 
open/closed.
+    /// For example negating `[5` would return `[-5`, or `-1)` would return 
`1)`
+    pub fn negate(&self) -> Result<IntervalBound> {
+        self.value.arithmetic_negate().map(|value| IntervalBound {
+            value,
+            open: self.open,
+        })
+    }
+
     /// This function adds the given `IntervalBound` to this `IntervalBound`.
     /// The result is unbounded if either is; otherwise, their values are
     /// added. The result is closed if both original bounds are closed, or open
diff --git a/datafusion/physical-expr/src/intervals/utils.rs 
b/datafusion/physical-expr/src/intervals/utils.rs
index be3b177713..2ddbca4073 100644
--- a/datafusion/physical-expr/src/intervals/utils.rs
+++ b/datafusion/physical-expr/src/intervals/utils.rs
@@ -17,18 +17,17 @@
 
 //! Utility functions for the interval arithmetic library
 
-use std::sync::Arc;
-
 use super::{Interval, IntervalBound};
 use crate::{
-    expressions::{BinaryExpr, CastExpr, Column, Literal},
+    expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
     PhysicalExpr,
 };
-
-use arrow_schema::DataType;
+use arrow_schema::{DataType, SchemaRef};
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::Operator;
 
+use std::sync::Arc;
+
 const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000;
 const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF;
 const DT_MS_MASK: i64 = 0xFFFF_FFFF;
@@ -37,16 +36,32 @@ const DT_MS_MASK: i64 = 0xFFFF_FFFF;
 /// Currently, we do not support all [`PhysicalExpr`]s for interval 
calculations.
 /// We do not support every type of [`Operator`]s either. Over time, this check
 /// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
-/// Currently, [`CastExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are 
supported.
-pub fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
+/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and 
[`Literal`] are supported.
+pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool 
{
     let expr_any = expr.as_any();
-    let expr_supported = if let Some(binary_expr) = 
expr_any.downcast_ref::<BinaryExpr>()
-    {
+    if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() {
         is_operator_supported(binary_expr.op())
+            && check_support(binary_expr.left(), schema)
+            && check_support(binary_expr.right(), schema)
+    } else if let Some(column) = expr_any.downcast_ref::<Column>() {
+        if let Ok(field) = schema.field_with_name(column.name()) {
+            is_datatype_supported(field.data_type())
+        } else {
+            return false;
+        }
+    } else if let Some(literal) = expr_any.downcast_ref::<Literal>() {
+        if let Ok(dt) = literal.data_type(schema) {
+            is_datatype_supported(&dt)
+        } else {
+            return false;
+        }
+    } else if let Some(cast) = expr_any.downcast_ref::<CastExpr>() {
+        check_support(cast.expr(), schema)
+    } else if let Some(negative) = expr_any.downcast_ref::<NegativeExpr>() {
+        check_support(negative.arg(), schema)
     } else {
-        expr_any.is::<Column>() || expr_any.is::<Literal>() || 
expr_any.is::<CastExpr>()
-    };
-    expr_supported && expr.children().iter().all(check_support)
+        false
+    }
 }
 
 // This function returns the inverse operator of the given operator.
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 4a8b189144..ccd0bd525f 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -203,7 +203,7 @@ impl ExecutionPlan for FilterExec {
     fn statistics(&self) -> Statistics {
         let predicate = self.predicate();
 
-        if !check_support(predicate) {
+        if !check_support(predicate, &self.schema()) {
             return Statistics::default();
         }
 

Reply via email to