This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1f4442e0fb Interval Arithmetic NegativeExpr Support (#7804)
1f4442e0fb is described below
commit 1f4442e0fb427b06a78854d39b07d68743b94d2a
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Oct 12 18:34:51 2023 +0300
Interval Arithmetic NegativeExpr Support (#7804)
* Intervals can propagate over NegativeExpr's.
* Addressing doc string reviews
---
datafusion/common/src/scalar.rs | 3 +-
.../core/src/physical_optimizer/join_selection.rs | 13 ++--
.../src/physical_optimizer/pipeline_checker.rs | 2 +-
.../physical-expr/src/expressions/negative.rs | 73 +++++++++++++++++++---
.../src/intervals/interval_aritmetic.rs | 20 ++++--
datafusion/physical-expr/src/intervals/utils.rs | 39 ++++++++----
datafusion/physical-plan/src/filter.rs | 2 +-
7 files changed, 120 insertions(+), 32 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index ae9c29b5a1..242d784edc 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1004,7 +1004,8 @@ impl ScalarValue {
| ScalarValue::Int16(None)
| ScalarValue::Int32(None)
| ScalarValue::Int64(None)
- | ScalarValue::Float32(None) => Ok(self.clone()),
+ | ScalarValue::Float32(None)
+ | ScalarValue::Float64(None) => Ok(self.clone()),
ScalarValue::Float64(Some(v)) =>
Ok(ScalarValue::Float64(Some(-v))),
ScalarValue::Float32(Some(v)) =>
Ok(ScalarValue::Float32(Some(-v))),
ScalarValue::Int8(Some(v)) => Ok(ScalarValue::Int8(Some(-v))),
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 4cff4a8f6c..3d5c93d4b3 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -1173,6 +1173,7 @@ mod tests_statistical {
#[cfg(test)]
mod util_tests {
+ use arrow_schema::{DataType, Field, Schema};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column,
NegativeExpr};
use datafusion_physical_expr::intervals::utils::check_support;
@@ -1181,26 +1182,30 @@ mod util_tests {
#[test]
fn check_expr_supported() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
let supported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
- assert!(check_support(&supported_expr));
+ assert!(check_support(&supported_expr, &schema));
let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn
PhysicalExpr>;
- assert!(check_support(&supported_expr_2));
+ assert!(check_support(&supported_expr_2, &schema));
let unsupported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
- assert!(!check_support(&unsupported_expr));
+ assert!(!check_support(&unsupported_expr, &schema));
let unsupported_expr_2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
)) as Arc<dyn PhysicalExpr>;
- assert!(!check_support(&unsupported_expr_2));
+ assert!(!check_support(&unsupported_expr_2, &schema));
}
}
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 3b994d5f89..fbc9910179 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -163,7 +163,7 @@ pub fn check_finiteness_requirements(
/// [`Operator`]: datafusion_expr::Operator
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
- check_support(filter.expression())
+ check_support(filter.expression(), &join.schema())
&& filter
.schema()
.fields()
diff --git a/datafusion/physical-expr/src/expressions/negative.rs
b/datafusion/physical-expr/src/expressions/negative.rs
index 90430cb2bb..c66c91ef31 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -17,26 +17,25 @@
//! Negation (-) expression
-use std::any::Any;
-use std::hash::{Hash, Hasher};
-use std::sync::Arc;
-
+use crate::intervals::Interval;
use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
-
use arrow::{
compute::kernels::numeric::neg_wrapping,
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
-
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::{
type_coercion::{is_interval, is_null, is_signed_numeric},
ColumnarValue,
};
+use std::any::Any;
+use std::hash::{Hash, Hasher};
+use std::sync::Arc;
+
/// Negative expression
#[derive(Debug, Hash)]
pub struct NegativeExpr {
@@ -105,6 +104,30 @@ impl PhysicalExpr for NegativeExpr {
self.hash(&mut s);
}
+ /// Given the child interval of a NegativeExpr, it calculates the
NegativeExpr's interval.
+ /// It replaces the upper and lower bounds after multiplying them with -1.
+ /// Ex: `(a, b] => [-b, -a)``
+ fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
+ Ok(Interval::new(
+ children[0].upper.negate()?,
+ children[0].lower.negate()?,
+ ))
+ }
+
+ /// Returns a new [`Interval`] of a NegativeExpr that has the existing
`interval` given that
+ /// given the input interval is known to be `children`.
+ fn propagate_constraints(
+ &self,
+ interval: &Interval,
+ children: &[&Interval],
+ ) -> Result<Vec<Option<Interval>>> {
+ let child_interval = children[0];
+ let negated_interval =
+ Interval::new(interval.upper.negate()?, interval.lower.negate()?);
+
+ Ok(vec![child_interval.intersect(negated_interval)?])
+ }
+
/// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-children[0]
@@ -144,7 +167,10 @@ pub fn negative(
#[cfg(test)]
mod tests {
use super::*;
- use crate::expressions::col;
+ use crate::{
+ expressions::{col, Column},
+ intervals::Interval,
+ };
#[allow(unused_imports)]
use arrow::array::*;
use arrow::datatypes::*;
@@ -187,4 +213,37 @@ mod tests {
test_array_negative_op!(Float64, 23456.0f64, 12345.0f64);
Ok(())
}
+
+ #[test]
+ fn test_evaluate_bounds() -> Result<()> {
+ let negative_expr = NegativeExpr {
+ arg: Arc::new(Column::new("a", 0)),
+ };
+ let child_interval = Interval::make(Some(-2), Some(1), (true, false));
+ let negative_expr_interval = Interval::make(Some(-1), Some(2), (false,
true));
+ assert_eq!(
+ negative_expr.evaluate_bounds(&[&child_interval])?,
+ negative_expr_interval
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_propagate_constraints() -> Result<()> {
+ let negative_expr = NegativeExpr {
+ arg: Arc::new(Column::new("a", 0)),
+ };
+ let original_child_interval = Interval::make(Some(-2), Some(3),
(false, false));
+ let negative_expr_interval = Interval::make(Some(0), Some(4), (true,
false));
+ let after_propagation =
+ vec![Some(Interval::make(Some(-2), Some(0), (false, true)))];
+ assert_eq!(
+ negative_expr.propagate_constraints(
+ &negative_expr_interval,
+ &[&original_child_interval]
+ )?,
+ after_propagation
+ );
+ Ok(())
+ }
}
diff --git a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
index 210db0ebe6..277124ea22 100644
--- a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
+++ b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
@@ -17,14 +17,8 @@
//! Interval arithmetic library
-use std::borrow::Borrow;
-use std::fmt;
-use std::fmt::{Display, Formatter};
-use std::ops::{AddAssign, SubAssign};
-
use crate::aggregate::min_max::{max, min};
use crate::intervals::rounding::{alter_fp_rounding_mode, next_down, next_up};
-
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::DataType;
use arrow_array::ArrowNativeTypeOp;
@@ -32,6 +26,11 @@ use datafusion_common::{exec_err, internal_err,
DataFusionError, Result, ScalarV
use datafusion_expr::type_coercion::binary::get_result_type;
use datafusion_expr::Operator;
+use std::borrow::Borrow;
+use std::fmt;
+use std::fmt::{Display, Formatter};
+use std::ops::{AddAssign, SubAssign};
+
/// This type represents a single endpoint of an [`Interval`]. An
/// endpoint can be open (does not include the endpoint) or closed
/// (includes the endpoint).
@@ -87,6 +86,15 @@ impl IntervalBound {
.map(|value| IntervalBound::new(value, self.open))
}
+ /// Returns a new bound with a negated value, if any, and the same
open/closed.
+ /// For example negating `[5` would return `[-5`, or `-1)` would return
`1)`
+ pub fn negate(&self) -> Result<IntervalBound> {
+ self.value.arithmetic_negate().map(|value| IntervalBound {
+ value,
+ open: self.open,
+ })
+ }
+
/// This function adds the given `IntervalBound` to this `IntervalBound`.
/// The result is unbounded if either is; otherwise, their values are
/// added. The result is closed if both original bounds are closed, or open
diff --git a/datafusion/physical-expr/src/intervals/utils.rs
b/datafusion/physical-expr/src/intervals/utils.rs
index be3b177713..2ddbca4073 100644
--- a/datafusion/physical-expr/src/intervals/utils.rs
+++ b/datafusion/physical-expr/src/intervals/utils.rs
@@ -17,18 +17,17 @@
//! Utility functions for the interval arithmetic library
-use std::sync::Arc;
-
use super::{Interval, IntervalBound};
use crate::{
- expressions::{BinaryExpr, CastExpr, Column, Literal},
+ expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
PhysicalExpr,
};
-
-use arrow_schema::DataType;
+use arrow_schema::{DataType, SchemaRef};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Operator;
+use std::sync::Arc;
+
const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000;
const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF;
const DT_MS_MASK: i64 = 0xFFFF_FFFF;
@@ -37,16 +36,32 @@ const DT_MS_MASK: i64 = 0xFFFF_FFFF;
/// Currently, we do not support all [`PhysicalExpr`]s for interval
calculations.
/// We do not support every type of [`Operator`]s either. Over time, this check
/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
-/// Currently, [`CastExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are
supported.
-pub fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
+/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and
[`Literal`] are supported.
+pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool
{
let expr_any = expr.as_any();
- let expr_supported = if let Some(binary_expr) =
expr_any.downcast_ref::<BinaryExpr>()
- {
+ if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() {
is_operator_supported(binary_expr.op())
+ && check_support(binary_expr.left(), schema)
+ && check_support(binary_expr.right(), schema)
+ } else if let Some(column) = expr_any.downcast_ref::<Column>() {
+ if let Ok(field) = schema.field_with_name(column.name()) {
+ is_datatype_supported(field.data_type())
+ } else {
+ return false;
+ }
+ } else if let Some(literal) = expr_any.downcast_ref::<Literal>() {
+ if let Ok(dt) = literal.data_type(schema) {
+ is_datatype_supported(&dt)
+ } else {
+ return false;
+ }
+ } else if let Some(cast) = expr_any.downcast_ref::<CastExpr>() {
+ check_support(cast.expr(), schema)
+ } else if let Some(negative) = expr_any.downcast_ref::<NegativeExpr>() {
+ check_support(negative.arg(), schema)
} else {
- expr_any.is::<Column>() || expr_any.is::<Literal>() ||
expr_any.is::<CastExpr>()
- };
- expr_supported && expr.children().iter().all(check_support)
+ false
+ }
}
// This function returns the inverse operator of the given operator.
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 4a8b189144..ccd0bd525f 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -203,7 +203,7 @@ impl ExecutionPlan for FilterExec {
fn statistics(&self) -> Statistics {
let predicate = self.predicate();
- if !check_support(predicate) {
+ if !check_support(predicate, &self.schema()) {
return Statistics::default();
}