This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 95d296cd5a Support n-ary monotonic functions in ordering equivalence
(#13841)
95d296cd5a is described below
commit 95d296cd5aa1c58baf18e7625afe50f2e8989b07
Author: Goksel Kabadayi <[email protected]>
AuthorDate: Fri Dec 20 11:32:56 2024 +0300
Support n-ary monotonic functions in ordering equivalence (#13841)
* Support n-ary monotonic functions in `discover_new_orderings`
* Add tests for n-ary monotonic functions in `discover_new_orderings`
* Fix tests
* Fix non-monotonic test case
* Fix unintended simplification
* Minor comment changes
* Fix tests
* Add `preserves_lex_ordering` field
* Use `preserves_lex_ordering` on `discover_new_orderings()`
* Add `output_ordering` and `output_preserves_lex_ordering` implementations
for `ConcatFunc`
* Update tests
* Move logic to UDF
* Cargo fmt
* Refactor
* Cargo fmt
* Simply use false value on default implementation
* Remove unnecessary import
* Clippy fix
* Update Cargo.lock
* Move dep to dev-dependencies
* Rename output_preserves_lex_ordering to preserves_lex_ordering
* minor
---------
Co-authored-by: berkaysynnada <[email protected]>
---
datafusion/expr-common/src/sort_properties.rs | 21 +-
datafusion/expr/src/udf.rs | 36 +++-
datafusion/functions/src/string/concat.rs | 5 +
datafusion/physical-expr/Cargo.toml | 1 +
.../physical-expr/src/equivalence/properties.rs | 232 ++++++++++++++++++---
datafusion/physical-expr/src/expressions/binary.rs | 8 +
.../physical-expr/src/expressions/literal.rs | 1 +
.../physical-expr/src/expressions/negative.rs | 1 +
datafusion/physical-expr/src/scalar_function.rs | 2 +
9 files changed, 269 insertions(+), 38 deletions(-)
diff --git a/datafusion/expr-common/src/sort_properties.rs
b/datafusion/expr-common/src/sort_properties.rs
index 7778be2ecf..5d17a34a96 100644
--- a/datafusion/expr-common/src/sort_properties.rs
+++ b/datafusion/expr-common/src/sort_properties.rs
@@ -129,19 +129,30 @@ impl Neg for SortProperties {
}
}
-/// Represents the properties of a `PhysicalExpr`, including its sorting and
range attributes.
+/// Represents the properties of a `PhysicalExpr`, including its sorting,
+/// range, and whether it preserves lexicographical ordering.
#[derive(Debug, Clone)]
pub struct ExprProperties {
+ /// Properties that describe the sorting behavior of the expression,
+ /// such as whether it is ordered, unordered, or a singleton value.
pub sort_properties: SortProperties,
+ /// A closed interval representing the range of possible values for
+ /// the expression. Used to compute reliable bounds.
pub range: Interval,
+ /// Indicates whether the expression preserves lexicographical ordering
+ /// of its inputs. For example, string concatenation preserves ordering,
+ /// while addition does not.
+ pub preserves_lex_ordering: bool,
}
impl ExprProperties {
- /// Creates a new `ExprProperties` instance with unknown sort properties
and unknown range.
+ /// Creates a new `ExprProperties` instance with unknown sort properties,
+ /// unknown range, and unknown lexicographical ordering preservation.
pub fn new_unknown() -> Self {
Self {
sort_properties: SortProperties::default(),
range: Interval::make_unbounded(&DataType::Null).unwrap(),
+ preserves_lex_ordering: false,
}
}
@@ -156,4 +167,10 @@ impl ExprProperties {
self.range = range;
self
}
+
+ /// Sets whether the expression maintains lexicographical ordering and
returns the modified instance.
+ pub fn with_preserves_lex_ordering(mut self, preserves_lex_ordering: bool)
-> Self {
+ self.preserves_lex_ordering = preserves_lex_ordering;
+ self
+ }
}
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index 809c78f30e..83200edfa2 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -303,6 +303,10 @@ impl ScalarUDF {
self.inner.output_ordering(inputs)
}
+ pub fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) ->
Result<bool> {
+ self.inner.preserves_lex_ordering(inputs)
+ }
+
/// See [`ScalarUDFImpl::coerce_types`] for more details.
pub fn coerce_types(&self, arg_types: &[DataType]) ->
Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
@@ -650,10 +654,30 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
Ok(Some(vec![]))
}
- /// Calculates the [`SortProperties`] of this function based on its
- /// children's properties.
- fn output_ordering(&self, _inputs: &[ExprProperties]) ->
Result<SortProperties> {
- Ok(SortProperties::Unordered)
+ /// Calculates the [`SortProperties`] of this function based on its
children's properties.
+ fn output_ordering(&self, inputs: &[ExprProperties]) ->
Result<SortProperties> {
+ if !self.preserves_lex_ordering(inputs)? {
+ return Ok(SortProperties::Unordered);
+ }
+
+ let Some(first_order) = inputs.first().map(|p| &p.sort_properties)
else {
+ return Ok(SortProperties::Singleton);
+ };
+
+ if inputs
+ .iter()
+ .skip(1)
+ .all(|input| &input.sort_properties == first_order)
+ {
+ Ok(*first_order)
+ } else {
+ Ok(SortProperties::Unordered)
+ }
+ }
+
+ /// Whether the function preserves lexicographical ordering based on the
input ordering
+ fn preserves_lex_ordering(&self, _inputs: &[ExprProperties]) ->
Result<bool> {
+ Ok(false)
}
/// Coerce arguments of a function call to types that the function can
evaluate.
@@ -809,6 +833,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.output_ordering(inputs)
}
+ fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) ->
Result<bool> {
+ self.inner.preserves_lex_ordering(inputs)
+ }
+
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
diff --git a/datafusion/functions/src/string/concat.rs
b/datafusion/functions/src/string/concat.rs
index 895a7cdbf3..f4848c1315 100644
--- a/datafusion/functions/src/string/concat.rs
+++ b/datafusion/functions/src/string/concat.rs
@@ -17,6 +17,7 @@
use arrow::array::{as_largestring_array, Array};
use arrow::datatypes::DataType;
+use datafusion_expr::sort_properties::ExprProperties;
use std::any::Any;
use std::sync::{Arc, OnceLock};
@@ -265,6 +266,10 @@ impl ScalarUDFImpl for ConcatFunc {
fn documentation(&self) -> Option<&Documentation> {
Some(get_concat_doc())
}
+
+ fn preserves_lex_ordering(&self, _inputs: &[ExprProperties]) ->
Result<bool> {
+ Ok(true)
+ }
}
static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
diff --git a/datafusion/physical-expr/Cargo.toml
b/datafusion/physical-expr/Cargo.toml
index db3e0e10d8..d1de63a1e8 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -57,6 +57,7 @@ petgraph = "0.6.2"
[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
criterion = "0.5"
+datafusion-functions = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
old mode 100644
new mode 100755
index e37e6b8abd..d4814fb4d7
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -346,40 +346,61 @@ impl EquivalenceProperties {
.unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
let mut new_orderings: Vec<LexOrdering> = vec![];
- for (ordering, next_expr) in self
- .normalized_oeq_class()
- .iter()
- .filter(|ordering| ordering[0].expr.eq(&normalized_expr))
- // First expression after leading ordering
- .filter_map(|ordering| Some(ordering).zip(ordering.inner.get(1)))
- {
- let leading_ordering = ordering[0].options;
- // Currently, we only handle expressions with a single child.
- // TODO: It should be possible to handle expressions orderings like
- // f(a, b, c), a, b, c if f is monotonic in all arguments.
+ for ordering in self.normalized_oeq_class().iter() {
+ if !ordering[0].expr.eq(&normalized_expr) {
+ continue;
+ }
+
+ let leading_ordering_options = ordering[0].options;
+
for equivalent_expr in &eq_class {
let children = equivalent_expr.children();
- if children.len() == 1
- && children[0].eq(&next_expr.expr)
- && SortProperties::Ordered(leading_ordering)
- == equivalent_expr
- .get_properties(&[ExprProperties {
- sort_properties: SortProperties::Ordered(
- leading_ordering,
- ),
- range: Interval::make_unbounded(
- &equivalent_expr.data_type(&self.schema)?,
- )?,
- }])?
- .sort_properties
- {
- // Assume existing ordering is [a ASC, b ASC]
- // When equality a = f(b) is given, If we know that given
ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
- // then we can deduce that ordering `[b ASC]` is also
valid.
- // Hence, ordering `[b ASC]` can be added to the state as
valid ordering.
- // (e.g. existing ordering where leading ordering is
removed)
-
new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
- break;
+ if children.is_empty() {
+ continue;
+ }
+
+ // Check if all children match the next expressions in the
ordering
+ let mut all_children_match = true;
+ let mut child_properties = vec![];
+
+ // Build properties for each child based on the next
expressions
+ for (i, child) in children.iter().enumerate() {
+ if let Some(next) = ordering.get(i + 1) {
+ if !child.as_ref().eq(next.expr.as_ref()) {
+ all_children_match = false;
+ break;
+ }
+ child_properties.push(ExprProperties {
+ sort_properties:
SortProperties::Ordered(next.options),
+ range: Interval::make_unbounded(
+ &child.data_type(&self.schema)?,
+ )?,
+ preserves_lex_ordering: true,
+ });
+ } else {
+ all_children_match = false;
+ break;
+ }
+ }
+
+ if all_children_match {
+ // Check if the expression is monotonic in all arguments
+ if let Ok(expr_properties) =
+ equivalent_expr.get_properties(&child_properties)
+ {
+ if expr_properties.preserves_lex_ordering
+ &&
SortProperties::Ordered(leading_ordering_options)
+ == expr_properties.sort_properties
+ {
+ // Assume existing ordering is [c ASC, a ASC, b
ASC]
+ // When equality c = f(a,b) is given, if we know
that given ordering `[a ASC, b ASC]`,
+ // ordering `[f(a,b) ASC]` is valid, then we can
deduce that ordering `[a ASC, b ASC]` is also valid.
+ // Hence, ordering `[a ASC, b ASC]` can be added
to the state as a valid ordering.
+ // (e.g. existing ordering where leading ordering
is removed)
+
new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
+ break;
+ }
+ }
}
}
}
@@ -1428,16 +1449,19 @@ fn get_expr_properties(
Ok(ExprProperties {
sort_properties: SortProperties::Ordered(column_order.options),
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+ preserves_lex_ordering: false,
})
} else if expr.as_any().downcast_ref::<Column>().is_some() {
Ok(ExprProperties {
sort_properties: SortProperties::Unordered,
range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+ preserves_lex_ordering: false,
})
} else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
Ok(ExprProperties {
sort_properties: SortProperties::Singleton,
range: Interval::try_new(literal.value().clone(),
literal.value().clone())?,
+ preserves_lex_ordering: true,
})
} else {
// Find orderings of its children
@@ -2143,11 +2167,14 @@ mod tests {
create_test_params, create_test_schema, output_schema,
};
use crate::expressions::{col, BinaryExpr, Column};
+ use crate::ScalarFunctionExpr;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::{Fields, TimeUnit};
use datafusion_expr::Operator;
+ use datafusion_functions::string::concat;
+
#[test]
fn project_equivalence_properties_test() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
@@ -3684,4 +3711,145 @@ mod tests {
sort_expr
}
+
+ #[test]
+ fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Utf8, false),
+ ]));
+
+ let col_a = col("a", &schema)?;
+ let col_b = col("b", &schema)?;
+ let col_c = col("c", &schema)?;
+
+ let a_concat_b: Arc<dyn PhysicalExpr> =
Arc::new(ScalarFunctionExpr::new(
+ "concat",
+ concat(),
+ vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+ DataType::Utf8,
+ ));
+
+ // Assume existing ordering is [c ASC, a ASC, b ASC]
+ let mut eq_properties =
EquivalenceProperties::new(Arc::clone(&schema));
+
+ eq_properties.add_new_ordering(LexOrdering::from(vec![
+ PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+ ]));
+
+ // Add equality condition c = concat(a, b)
+ eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;
+
+ let orderings = eq_properties.oeq_class().orderings.clone();
+
+ let expected_ordering1 =
+ LexOrdering::from(vec![
+ PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc()
+ ]);
+ let expected_ordering2 = LexOrdering::from(vec![
+ PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+ ]);
+
+ // The ordering should be [c ASC] and [a ASC, b ASC]
+ assert_eq!(orderings.len(), 2);
+ assert!(orderings.contains(&expected_ordering1));
+ assert!(orderings.contains(&expected_ordering2));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_ordering_equivalence_with_non_lex_monotonic_multiply() ->
Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]));
+
+ let col_a = col("a", &schema)?;
+ let col_b = col("b", &schema)?;
+ let col_c = col("c", &schema)?;
+
+ let a_times_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+ Arc::clone(&col_a),
+ Operator::Multiply,
+ Arc::clone(&col_b),
+ ));
+
+ // Assume existing ordering is [c ASC, a ASC, b ASC]
+ let mut eq_properties =
EquivalenceProperties::new(Arc::clone(&schema));
+
+ let initial_ordering = LexOrdering::from(vec![
+ PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+ ]);
+
+ eq_properties.add_new_ordering(initial_ordering.clone());
+
+ // Add equality condition c = a * b
+ eq_properties.add_equal_conditions(&col_c, &a_times_b)?;
+
+ let orderings = eq_properties.oeq_class().orderings.clone();
+
+ // The ordering should remain unchanged since multiplication is not
lex-monotonic
+ assert_eq!(orderings.len(), 1);
+ assert!(orderings.contains(&initial_ordering));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_ordering_equivalence_with_concat_equality() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Utf8, false),
+ ]));
+
+ let col_a = col("a", &schema)?;
+ let col_b = col("b", &schema)?;
+ let col_c = col("c", &schema)?;
+
+ let a_concat_b: Arc<dyn PhysicalExpr> =
Arc::new(ScalarFunctionExpr::new(
+ "concat",
+ concat(),
+ vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+ DataType::Utf8,
+ ));
+
+ // Assume existing ordering is [concat(a, b) ASC, a ASC, b ASC]
+ let mut eq_properties =
EquivalenceProperties::new(Arc::clone(&schema));
+
+ eq_properties.add_new_ordering(LexOrdering::from(vec![
+ PhysicalSortExpr::new_default(Arc::clone(&a_concat_b)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+ ]));
+
+ // Add equality condition c = concat(a, b)
+ eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;
+
+ let orderings = eq_properties.oeq_class().orderings.clone();
+
+ let expected_ordering1 =
LexOrdering::from(vec![PhysicalSortExpr::new_default(
+ Arc::clone(&a_concat_b),
+ )
+ .asc()]);
+ let expected_ordering2 = LexOrdering::from(vec![
+ PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+ PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+ ]);
+
+ // The ordering should be [concat(a, b) ASC] and [a ASC, b ASC]
+ assert_eq!(orderings.len(), 2);
+ assert!(orderings.contains(&expected_ordering1));
+ assert!(orderings.contains(&expected_ordering2));
+
+ Ok(())
+ }
}
diff --git a/datafusion/physical-expr/src/expressions/binary.rs
b/datafusion/physical-expr/src/expressions/binary.rs
index ae2bfe5b0b..938d775a2a 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -503,34 +503,42 @@ impl PhysicalExpr for BinaryExpr {
Operator::Plus => Ok(ExprProperties {
sort_properties: l_order.add(&r_order),
range: l_range.add(r_range)?,
+ preserves_lex_ordering: false,
}),
Operator::Minus => Ok(ExprProperties {
sort_properties: l_order.sub(&r_order),
range: l_range.sub(r_range)?,
+ preserves_lex_ordering: false,
}),
Operator::Gt => Ok(ExprProperties {
sort_properties: l_order.gt_or_gteq(&r_order),
range: l_range.gt(r_range)?,
+ preserves_lex_ordering: false,
}),
Operator::GtEq => Ok(ExprProperties {
sort_properties: l_order.gt_or_gteq(&r_order),
range: l_range.gt_eq(r_range)?,
+ preserves_lex_ordering: false,
}),
Operator::Lt => Ok(ExprProperties {
sort_properties: r_order.gt_or_gteq(&l_order),
range: l_range.lt(r_range)?,
+ preserves_lex_ordering: false,
}),
Operator::LtEq => Ok(ExprProperties {
sort_properties: r_order.gt_or_gteq(&l_order),
range: l_range.lt_eq(r_range)?,
+ preserves_lex_ordering: false,
}),
Operator::And => Ok(ExprProperties {
sort_properties: r_order.and_or(&l_order),
range: l_range.and(r_range)?,
+ preserves_lex_ordering: false,
}),
Operator::Or => Ok(ExprProperties {
sort_properties: r_order.and_or(&l_order),
range: l_range.or(r_range)?,
+ preserves_lex_ordering: false,
}),
_ => Ok(ExprProperties::new_unknown()),
}
diff --git a/datafusion/physical-expr/src/expressions/literal.rs
b/datafusion/physical-expr/src/expressions/literal.rs
index f0d02eb605..c594f039ff 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -90,6 +90,7 @@ impl PhysicalExpr for Literal {
Ok(ExprProperties {
sort_properties: SortProperties::Singleton,
range: Interval::try_new(self.value().clone(),
self.value().clone())?,
+ preserves_lex_ordering: true,
})
}
}
diff --git a/datafusion/physical-expr/src/expressions/negative.rs
b/datafusion/physical-expr/src/expressions/negative.rs
index 6235845fc0..03f2111aca 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -145,6 +145,7 @@ impl PhysicalExpr for NegativeExpr {
Ok(ExprProperties {
sort_properties: -children[0].sort_properties,
range: children[0].range.clone().arithmetic_negate()?,
+ preserves_lex_ordering: false,
})
}
}
diff --git a/datafusion/physical-expr/src/scalar_function.rs
b/datafusion/physical-expr/src/scalar_function.rs
index e312d5de59..82c718cfac 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -202,6 +202,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
fn get_properties(&self, children: &[ExprProperties]) ->
Result<ExprProperties> {
let sort_properties = self.fun.output_ordering(children)?;
+ let preserves_lex_ordering =
self.fun.preserves_lex_ordering(children)?;
let children_range = children
.iter()
.map(|props| &props.range)
@@ -211,6 +212,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
Ok(ExprProperties {
sort_properties,
range,
+ preserves_lex_ordering,
})
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]