This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 95d296cd5a Support n-ary monotonic functions in ordering equivalence 
(#13841)
95d296cd5a is described below

commit 95d296cd5aa1c58baf18e7625afe50f2e8989b07
Author: Goksel Kabadayi <[email protected]>
AuthorDate: Fri Dec 20 11:32:56 2024 +0300

    Support n-ary monotonic functions in ordering equivalence (#13841)
    
    * Support n-ary monotonic functions in `discover_new_orderings`
    
    * Add tests for n-ary monotonic functions in `discover_new_orderings`
    
    * Fix tests
    
    * Fix non-monotonic test case
    
    * Fix unintended simplification
    
    * Minor comment changes
    
    * Fix tests
    
    * Add `preserves_lex_ordering` field
    
    * Use `preserves_lex_ordering` on `discover_new_orderings()`
    
    * Add `output_ordering` and `output_preserves_lex_ordering` implementations 
for `ConcatFunc`
    
    * Update tests
    
    * Move logic to UDF
    
    * Cargo fmt
    
    * Refactor
    
    * Cargo fmt
    
    * Simply use false value on default implementation
    
    * Remove unnecessary import
    
    * Clippy fix
    
    * Update Cargo.lock
    
    * Move dep to dev-dependencies
    
    * Rename output_preserves_lex_ordering to preserves_lex_ordering
    
    * minor
    
    ---------
    
    Co-authored-by: berkaysynnada <[email protected]>
---
 datafusion/expr-common/src/sort_properties.rs      |  21 +-
 datafusion/expr/src/udf.rs                         |  36 +++-
 datafusion/functions/src/string/concat.rs          |   5 +
 datafusion/physical-expr/Cargo.toml                |   1 +
 .../physical-expr/src/equivalence/properties.rs    | 232 ++++++++++++++++++---
 datafusion/physical-expr/src/expressions/binary.rs |   8 +
 .../physical-expr/src/expressions/literal.rs       |   1 +
 .../physical-expr/src/expressions/negative.rs      |   1 +
 datafusion/physical-expr/src/scalar_function.rs    |   2 +
 9 files changed, 269 insertions(+), 38 deletions(-)

diff --git a/datafusion/expr-common/src/sort_properties.rs 
b/datafusion/expr-common/src/sort_properties.rs
index 7778be2ecf..5d17a34a96 100644
--- a/datafusion/expr-common/src/sort_properties.rs
+++ b/datafusion/expr-common/src/sort_properties.rs
@@ -129,19 +129,30 @@ impl Neg for SortProperties {
     }
 }
 
-/// Represents the properties of a `PhysicalExpr`, including its sorting and 
range attributes.
+/// Represents the properties of a `PhysicalExpr`, including its sorting,
+/// range, and whether it preserves lexicographical ordering.
 #[derive(Debug, Clone)]
 pub struct ExprProperties {
+    /// Properties that describe the sorting behavior of the expression,
+    /// such as whether it is ordered, unordered, or a singleton value.
     pub sort_properties: SortProperties,
+    /// A closed interval representing the range of possible values for
+    /// the expression. Used to compute reliable bounds.
     pub range: Interval,
+    /// Indicates whether the expression preserves lexicographical ordering
+    /// of its inputs. For example, string concatenation preserves ordering,
+    /// while addition does not.
+    pub preserves_lex_ordering: bool,
 }
 
 impl ExprProperties {
-    /// Creates a new `ExprProperties` instance with unknown sort properties 
and unknown range.
+    /// Creates a new `ExprProperties` instance with unknown sort properties,
+    /// unknown range, and unknown lexicographical ordering preservation.
     pub fn new_unknown() -> Self {
         Self {
             sort_properties: SortProperties::default(),
             range: Interval::make_unbounded(&DataType::Null).unwrap(),
+            preserves_lex_ordering: false,
         }
     }
 
@@ -156,4 +167,10 @@ impl ExprProperties {
         self.range = range;
         self
     }
+
+    /// Sets whether the expression maintains lexicographical ordering and 
returns the modified instance.
+    pub fn with_preserves_lex_ordering(mut self, preserves_lex_ordering: bool) 
-> Self {
+        self.preserves_lex_ordering = preserves_lex_ordering;
+        self
+    }
 }
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index 809c78f30e..83200edfa2 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -303,6 +303,10 @@ impl ScalarUDF {
         self.inner.output_ordering(inputs)
     }
 
+    pub fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> 
Result<bool> {
+        self.inner.preserves_lex_ordering(inputs)
+    }
+
     /// See [`ScalarUDFImpl::coerce_types`] for more details.
     pub fn coerce_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
         self.inner.coerce_types(arg_types)
@@ -650,10 +654,30 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
         Ok(Some(vec![]))
     }
 
-    /// Calculates the [`SortProperties`] of this function based on its
-    /// children's properties.
-    fn output_ordering(&self, _inputs: &[ExprProperties]) -> 
Result<SortProperties> {
-        Ok(SortProperties::Unordered)
+    /// Calculates the [`SortProperties`] of this function based on its 
children's properties.
+    fn output_ordering(&self, inputs: &[ExprProperties]) -> 
Result<SortProperties> {
+        if !self.preserves_lex_ordering(inputs)? {
+            return Ok(SortProperties::Unordered);
+        }
+
+        let Some(first_order) = inputs.first().map(|p| &p.sort_properties) 
else {
+            return Ok(SortProperties::Singleton);
+        };
+
+        if inputs
+            .iter()
+            .skip(1)
+            .all(|input| &input.sort_properties == first_order)
+        {
+            Ok(*first_order)
+        } else {
+            Ok(SortProperties::Unordered)
+        }
+    }
+
+    /// Whether the function preserves lexicographical ordering based on the 
input ordering
+    fn preserves_lex_ordering(&self, _inputs: &[ExprProperties]) -> 
Result<bool> {
+        Ok(false)
     }
 
     /// Coerce arguments of a function call to types that the function can 
evaluate.
@@ -809,6 +833,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
         self.inner.output_ordering(inputs)
     }
 
+    fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> 
Result<bool> {
+        self.inner.preserves_lex_ordering(inputs)
+    }
+
     fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
         self.inner.coerce_types(arg_types)
     }
diff --git a/datafusion/functions/src/string/concat.rs 
b/datafusion/functions/src/string/concat.rs
index 895a7cdbf3..f4848c1315 100644
--- a/datafusion/functions/src/string/concat.rs
+++ b/datafusion/functions/src/string/concat.rs
@@ -17,6 +17,7 @@
 
 use arrow::array::{as_largestring_array, Array};
 use arrow::datatypes::DataType;
+use datafusion_expr::sort_properties::ExprProperties;
 use std::any::Any;
 use std::sync::{Arc, OnceLock};
 
@@ -265,6 +266,10 @@ impl ScalarUDFImpl for ConcatFunc {
     fn documentation(&self) -> Option<&Documentation> {
         Some(get_concat_doc())
     }
+
+    fn preserves_lex_ordering(&self, _inputs: &[ExprProperties]) -> 
Result<bool> {
+        Ok(true)
+    }
 }
 
 static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
diff --git a/datafusion/physical-expr/Cargo.toml 
b/datafusion/physical-expr/Cargo.toml
index db3e0e10d8..d1de63a1e8 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -57,6 +57,7 @@ petgraph = "0.6.2"
 [dev-dependencies]
 arrow = { workspace = true, features = ["test_utils"] }
 criterion = "0.5"
+datafusion-functions = { workspace = true }
 rand = { workspace = true }
 rstest = { workspace = true }
 
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
old mode 100644
new mode 100755
index e37e6b8abd..d4814fb4d7
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -346,40 +346,61 @@ impl EquivalenceProperties {
             .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
 
         let mut new_orderings: Vec<LexOrdering> = vec![];
-        for (ordering, next_expr) in self
-            .normalized_oeq_class()
-            .iter()
-            .filter(|ordering| ordering[0].expr.eq(&normalized_expr))
-            // First expression after leading ordering
-            .filter_map(|ordering| Some(ordering).zip(ordering.inner.get(1)))
-        {
-            let leading_ordering = ordering[0].options;
-            // Currently, we only handle expressions with a single child.
-            // TODO: It should be possible to handle expressions orderings like
-            //       f(a, b, c), a, b, c if f is monotonic in all arguments.
+        for ordering in self.normalized_oeq_class().iter() {
+            if !ordering[0].expr.eq(&normalized_expr) {
+                continue;
+            }
+
+            let leading_ordering_options = ordering[0].options;
+
             for equivalent_expr in &eq_class {
                 let children = equivalent_expr.children();
-                if children.len() == 1
-                    && children[0].eq(&next_expr.expr)
-                    && SortProperties::Ordered(leading_ordering)
-                        == equivalent_expr
-                            .get_properties(&[ExprProperties {
-                                sort_properties: SortProperties::Ordered(
-                                    leading_ordering,
-                                ),
-                                range: Interval::make_unbounded(
-                                    &equivalent_expr.data_type(&self.schema)?,
-                                )?,
-                            }])?
-                            .sort_properties
-                {
-                    // Assume existing ordering is [a ASC, b ASC]
-                    // When equality a = f(b) is given, If we know that given 
ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
-                    // then we can deduce that ordering `[b ASC]` is also 
valid.
-                    // Hence, ordering `[b ASC]` can be added to the state as 
valid ordering.
-                    // (e.g. existing ordering where leading ordering is 
removed)
-                    
new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
-                    break;
+                if children.is_empty() {
+                    continue;
+                }
+
+                // Check if all children match the next expressions in the 
ordering
+                let mut all_children_match = true;
+                let mut child_properties = vec![];
+
+                // Build properties for each child based on the next 
expressions
+                for (i, child) in children.iter().enumerate() {
+                    if let Some(next) = ordering.get(i + 1) {
+                        if !child.as_ref().eq(next.expr.as_ref()) {
+                            all_children_match = false;
+                            break;
+                        }
+                        child_properties.push(ExprProperties {
+                            sort_properties: 
SortProperties::Ordered(next.options),
+                            range: Interval::make_unbounded(
+                                &child.data_type(&self.schema)?,
+                            )?,
+                            preserves_lex_ordering: true,
+                        });
+                    } else {
+                        all_children_match = false;
+                        break;
+                    }
+                }
+
+                if all_children_match {
+                    // Check if the expression is monotonic in all arguments
+                    if let Ok(expr_properties) =
+                        equivalent_expr.get_properties(&child_properties)
+                    {
+                        if expr_properties.preserves_lex_ordering
+                            && 
SortProperties::Ordered(leading_ordering_options)
+                                == expr_properties.sort_properties
+                        {
+                            // Assume existing ordering is [c ASC, a ASC, b 
ASC]
+                            // When equality c = f(a,b) is given, if we know 
that given ordering `[a ASC, b ASC]`,
+                            // ordering `[f(a,b) ASC]` is valid, then we can 
deduce that ordering `[a ASC, b ASC]` is also valid.
+                            // Hence, ordering `[a ASC, b ASC]` can be added 
to the state as a valid ordering.
+                            // (e.g. existing ordering where leading ordering 
is removed)
+                            
new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
+                            break;
+                        }
+                    }
                 }
             }
         }
@@ -1428,16 +1449,19 @@ fn get_expr_properties(
         Ok(ExprProperties {
             sort_properties: SortProperties::Ordered(column_order.options),
             range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+            preserves_lex_ordering: false,
         })
     } else if expr.as_any().downcast_ref::<Column>().is_some() {
         Ok(ExprProperties {
             sort_properties: SortProperties::Unordered,
             range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+            preserves_lex_ordering: false,
         })
     } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
         Ok(ExprProperties {
             sort_properties: SortProperties::Singleton,
             range: Interval::try_new(literal.value().clone(), 
literal.value().clone())?,
+            preserves_lex_ordering: true,
         })
     } else {
         // Find orderings of its children
@@ -2143,11 +2167,14 @@ mod tests {
         create_test_params, create_test_schema, output_schema,
     };
     use crate::expressions::{col, BinaryExpr, Column};
+    use crate::ScalarFunctionExpr;
 
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow_schema::{Fields, TimeUnit};
     use datafusion_expr::Operator;
 
+    use datafusion_functions::string::concat;
+
     #[test]
     fn project_equivalence_properties_test() -> Result<()> {
         let input_schema = Arc::new(Schema::new(vec![
@@ -3684,4 +3711,145 @@ mod tests {
 
         sort_expr
     }
+
+    #[test]
+    fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+            Field::new("c", DataType::Utf8, false),
+        ]));
+
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        let col_c = col("c", &schema)?;
+
+        let a_concat_b: Arc<dyn PhysicalExpr> = 
Arc::new(ScalarFunctionExpr::new(
+            "concat",
+            concat(),
+            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+            DataType::Utf8,
+        ));
+
+        // Assume existing ordering is [c ASC, a ASC, b ASC]
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        eq_properties.add_new_ordering(LexOrdering::from(vec![
+            PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+        ]));
+
+        // Add equality condition c = concat(a, b)
+        eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;
+
+        let orderings = eq_properties.oeq_class().orderings.clone();
+
+        let expected_ordering1 =
+            LexOrdering::from(vec![
+                PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc()
+            ]);
+        let expected_ordering2 = LexOrdering::from(vec![
+            PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+        ]);
+
+        // The ordering should be [c ASC] and [a ASC, b ASC]
+        assert_eq!(orderings.len(), 2);
+        assert!(orderings.contains(&expected_ordering1));
+        assert!(orderings.contains(&expected_ordering2));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_ordering_equivalence_with_non_lex_monotonic_multiply() -> 
Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]));
+
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        let col_c = col("c", &schema)?;
+
+        let a_times_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+            Arc::clone(&col_a),
+            Operator::Multiply,
+            Arc::clone(&col_b),
+        ));
+
+        // Assume existing ordering is [c ASC, a ASC, b ASC]
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        let initial_ordering = LexOrdering::from(vec![
+            PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+        ]);
+
+        eq_properties.add_new_ordering(initial_ordering.clone());
+
+        // Add equality condition c = a * b
+        eq_properties.add_equal_conditions(&col_c, &a_times_b)?;
+
+        let orderings = eq_properties.oeq_class().orderings.clone();
+
+        // The ordering should remain unchanged since multiplication is not 
lex-monotonic
+        assert_eq!(orderings.len(), 1);
+        assert!(orderings.contains(&initial_ordering));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_ordering_equivalence_with_concat_equality() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+            Field::new("c", DataType::Utf8, false),
+        ]));
+
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        let col_c = col("c", &schema)?;
+
+        let a_concat_b: Arc<dyn PhysicalExpr> = 
Arc::new(ScalarFunctionExpr::new(
+            "concat",
+            concat(),
+            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+            DataType::Utf8,
+        ));
+
+        // Assume existing ordering is [concat(a, b) ASC, a ASC, b ASC]
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        eq_properties.add_new_ordering(LexOrdering::from(vec![
+            PhysicalSortExpr::new_default(Arc::clone(&a_concat_b)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+        ]));
+
+        // Add equality condition c = concat(a, b)
+        eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;
+
+        let orderings = eq_properties.oeq_class().orderings.clone();
+
+        let expected_ordering1 = 
LexOrdering::from(vec![PhysicalSortExpr::new_default(
+            Arc::clone(&a_concat_b),
+        )
+        .asc()]);
+        let expected_ordering2 = LexOrdering::from(vec![
+            PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
+            PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
+        ]);
+
+        // The ordering should be [concat(a, b) ASC] and [a ASC, b ASC]
+        assert_eq!(orderings.len(), 2);
+        assert!(orderings.contains(&expected_ordering1));
+        assert!(orderings.contains(&expected_ordering2));
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-expr/src/expressions/binary.rs 
b/datafusion/physical-expr/src/expressions/binary.rs
index ae2bfe5b0b..938d775a2a 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -503,34 +503,42 @@ impl PhysicalExpr for BinaryExpr {
             Operator::Plus => Ok(ExprProperties {
                 sort_properties: l_order.add(&r_order),
                 range: l_range.add(r_range)?,
+                preserves_lex_ordering: false,
             }),
             Operator::Minus => Ok(ExprProperties {
                 sort_properties: l_order.sub(&r_order),
                 range: l_range.sub(r_range)?,
+                preserves_lex_ordering: false,
             }),
             Operator::Gt => Ok(ExprProperties {
                 sort_properties: l_order.gt_or_gteq(&r_order),
                 range: l_range.gt(r_range)?,
+                preserves_lex_ordering: false,
             }),
             Operator::GtEq => Ok(ExprProperties {
                 sort_properties: l_order.gt_or_gteq(&r_order),
                 range: l_range.gt_eq(r_range)?,
+                preserves_lex_ordering: false,
             }),
             Operator::Lt => Ok(ExprProperties {
                 sort_properties: r_order.gt_or_gteq(&l_order),
                 range: l_range.lt(r_range)?,
+                preserves_lex_ordering: false,
             }),
             Operator::LtEq => Ok(ExprProperties {
                 sort_properties: r_order.gt_or_gteq(&l_order),
                 range: l_range.lt_eq(r_range)?,
+                preserves_lex_ordering: false,
             }),
             Operator::And => Ok(ExprProperties {
                 sort_properties: r_order.and_or(&l_order),
                 range: l_range.and(r_range)?,
+                preserves_lex_ordering: false,
             }),
             Operator::Or => Ok(ExprProperties {
                 sort_properties: r_order.and_or(&l_order),
                 range: l_range.or(r_range)?,
+                preserves_lex_ordering: false,
             }),
             _ => Ok(ExprProperties::new_unknown()),
         }
diff --git a/datafusion/physical-expr/src/expressions/literal.rs 
b/datafusion/physical-expr/src/expressions/literal.rs
index f0d02eb605..c594f039ff 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -90,6 +90,7 @@ impl PhysicalExpr for Literal {
         Ok(ExprProperties {
             sort_properties: SortProperties::Singleton,
             range: Interval::try_new(self.value().clone(), 
self.value().clone())?,
+            preserves_lex_ordering: true,
         })
     }
 }
diff --git a/datafusion/physical-expr/src/expressions/negative.rs 
b/datafusion/physical-expr/src/expressions/negative.rs
index 6235845fc0..03f2111aca 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -145,6 +145,7 @@ impl PhysicalExpr for NegativeExpr {
         Ok(ExprProperties {
             sort_properties: -children[0].sort_properties,
             range: children[0].range.clone().arithmetic_negate()?,
+            preserves_lex_ordering: false,
         })
     }
 }
diff --git a/datafusion/physical-expr/src/scalar_function.rs 
b/datafusion/physical-expr/src/scalar_function.rs
index e312d5de59..82c718cfac 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -202,6 +202,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
 
     fn get_properties(&self, children: &[ExprProperties]) -> 
Result<ExprProperties> {
         let sort_properties = self.fun.output_ordering(children)?;
+        let preserves_lex_ordering = 
self.fun.preserves_lex_ordering(children)?;
         let children_range = children
             .iter()
             .map(|props| &props.range)
@@ -211,6 +212,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
         Ok(ExprProperties {
             sort_properties,
             range,
+            preserves_lex_ordering,
         })
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to