This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cc01a4131 Ordering Equivalence Builder (#6452)
1cc01a4131 is described below

commit 1cc01a4131e587f00da1783e7ad2e5dc699c1847
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat May 27 16:49:02 2023 +0300

    Ordering Equivalence Builder (#6452)
    
    * Convert ordering equivalence to vec (unit test)
    
    * compiles
    
    * Simplifications
    
    * simplifications
    
    * Remove unnecessary codes
    
    * simplifications
    
    * Add test cases
    
    * fix bug
    
    * simplifications
    
    * Resolve linter errors
    
    * remove unnecessary codes
    
    * simplifications
    
    * simplifications
    
    * Remove unnecessary codes
    
    * Add pruning to ordering_equivalence projection
    
    * Remove unnecessary clones
    
    * Convert get range to calculate compatible ranges
    
    * Simplifications
    
    * Update comments
    
    * Update comments
    
    * Use builder style for ordering equivalence creation
    
    * Minor comment changes
    
    * Address reviews
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/core/src/physical_plan/windows/mod.rs | 58 ++++++------------
 datafusion/physical-expr/src/equivalence.rs      | 78 +++++++++++++++++++++++-
 2 files changed, 94 insertions(+), 42 deletions(-)

diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index d4732cc1f6..d7eedf7f18 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -47,14 +47,14 @@ mod window_agg_exec;
 pub use bounded_window_agg_exec::BoundedWindowAggExec;
 pub use bounded_window_agg_exec::PartitionSearchMode;
 use datafusion_common::utils::longest_consecutive_prefix;
+use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::utils::{convert_to_expr, 
get_indices_of_matching_exprs};
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
 };
 use datafusion_physical_expr::{
-    normalize_expr_with_equivalence_properties, OrderedColumn,
-    OrderingEquivalenceProperties, PhysicalSortRequirement,
+    OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement,
 };
 pub use window_agg_exec::WindowAggExec;
 
@@ -254,30 +254,10 @@ pub(crate) fn window_ordering_equivalence(
 ) -> OrderingEquivalenceProperties {
     // We need to update the schema, so we can not directly use
     // `input.ordering_equivalence_properties()`.
-    let mut result = OrderingEquivalenceProperties::new(schema.clone());
-    result.extend(
-        input
-            .ordering_equivalence_properties()
-            .classes()
-            .iter()
-            .cloned(),
-    );
-    let mut normalized_out_ordering = vec![];
-    for item in input.output_ordering().unwrap_or(&[]) {
-        // To account for ordering equivalences, first normalize the 
expression:
-        let normalized = normalize_expr_with_equivalence_properties(
-            item.expr.clone(),
-            input.equivalence_properties().classes(),
-        );
-        // Currently we only support, ordering equivalences for `Column` 
expressions.
-        // TODO: Add support for ordering equivalence for all `PhysicalExpr`s
-        if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
-            normalized_out_ordering
-                .push(OrderedColumn::new(column.clone(), item.options));
-        } else {
-            break;
-        }
-    }
+    let mut builder = OrderingEquivalenceBuilder::new(schema.clone())
+        .with_equivalences(input.equivalence_properties())
+        .with_existing_ordering(input.output_ordering().map(|elem| 
elem.to_vec()))
+        .extend(input.ordering_equivalence_properties());
     for expr in window_expr {
         if let Some(builtin_window_expr) =
             expr.as_any().downcast_ref::<BuiltInWindowExpr>()
@@ -289,25 +269,21 @@ pub(crate) fn window_ordering_equivalence(
                 .as_any()
                 .is::<RowNumber>()
             {
-                // If there is an existing ordering, add new ordering as an 
equivalence:
-                if !normalized_out_ordering.is_empty() {
-                    if let Some((idx, field)) =
-                        schema.column_with_name(expr.field().unwrap().name())
-                    {
-                        let column = Column::new(field.name(), idx);
-                        let options = SortOptions {
-                            descending: false,
-                            nulls_first: false,
-                        }; // ASC, NULLS LAST
-                        let rhs = OrderedColumn::new(column, options);
-                        result
-                            .add_equal_conditions((&normalized_out_ordering, 
&vec![rhs]));
-                    }
+                if let Some((idx, field)) =
+                    schema.column_with_name(expr.field().unwrap().name())
+                {
+                    let column = Column::new(field.name(), idx);
+                    let options = SortOptions {
+                        descending: false,
+                        nulls_first: false,
+                    }; // ASC, NULLS LAST
+                    let rhs = OrderedColumn::new(column, options);
+                    builder.add_equal_conditions(vec![rhs]);
                 }
             }
         }
     }
-    result
+    builder.build()
 }
 #[cfg(test)]
 mod tests {
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index c3a55437dd..6af72efc5a 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -16,7 +16,9 @@
 // under the License.
 
 use crate::expressions::Column;
-use crate::{PhysicalSortExpr, PhysicalSortRequirement};
+use crate::{
+    normalize_expr_with_equivalence_properties, PhysicalSortExpr, 
PhysicalSortRequirement,
+};
 
 use arrow::datatypes::SchemaRef;
 use arrow_schema::SortOptions;
@@ -269,6 +271,80 @@ impl OrderingEquivalentClass {
     }
 }
 
+/// This is a builder object facilitating incremental construction
+/// for ordering equivalences.
+pub struct OrderingEquivalenceBuilder {
+    eq_properties: EquivalenceProperties,
+    ordering_eq_properties: OrderingEquivalenceProperties,
+    existing_ordering: Vec<PhysicalSortExpr>,
+}
+
+impl OrderingEquivalenceBuilder {
+    pub fn new(schema: SchemaRef) -> Self {
+        let eq_properties = EquivalenceProperties::new(schema.clone());
+        let ordering_eq_properties = 
OrderingEquivalenceProperties::new(schema);
+        Self {
+            eq_properties,
+            ordering_eq_properties,
+            existing_ordering: vec![],
+        }
+    }
+
+    pub fn extend(
+        mut self,
+        new_ordering_eq_properties: OrderingEquivalenceProperties,
+    ) -> Self {
+        self.ordering_eq_properties
+            .extend(new_ordering_eq_properties.classes().iter().cloned());
+        self
+    }
+
+    pub fn with_existing_ordering(
+        mut self,
+        existing_ordering: Option<Vec<PhysicalSortExpr>>,
+    ) -> Self {
+        if let Some(existing_ordering) = existing_ordering {
+            self.existing_ordering = existing_ordering;
+        }
+        self
+    }
+
+    pub fn with_equivalences(mut self, new_eq_properties: 
EquivalenceProperties) -> Self {
+        self.eq_properties = new_eq_properties;
+        self
+    }
+
+    pub fn add_equal_conditions(&mut self, new_equivalent_ordering: 
Vec<OrderedColumn>) {
+        let mut normalized_out_ordering = vec![];
+        for item in &self.existing_ordering {
+            // To account for ordering equivalences, first normalize the 
expression:
+            let normalized = normalize_expr_with_equivalence_properties(
+                item.expr.clone(),
+                self.eq_properties.classes(),
+            );
+            // Currently we only support ordering equivalences for `Column` 
expressions.
+            // TODO: Add support for ordering equivalence for all 
`PhysicalExpr`s.
+            if let Some(column) = normalized.as_any().downcast_ref::<Column>() 
{
+                normalized_out_ordering
+                    .push(OrderedColumn::new(column.clone(), item.options));
+            } else {
+                break;
+            }
+        }
+        // If there is an existing ordering, add new ordering as an 
equivalence:
+        if !normalized_out_ordering.is_empty() {
+            self.ordering_eq_properties.add_equal_conditions((
+                &normalized_out_ordering,
+                &new_equivalent_ordering,
+            ));
+        }
+    }
+
+    pub fn build(self) -> OrderingEquivalenceProperties {
+        self.ordering_eq_properties
+    }
+}
+
 /// This function applies the given projection to the given equivalence
 /// properties to compute the resulting (projected) equivalence properties; 
e.g.
 /// 1) Adding an alias, which can introduce additional equivalence properties,

Reply via email to