This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1015d98d9b Lexicographical Ordering Equivalence Support (#6431)
1015d98d9b is described below

commit 1015d98d9b59da67c257f70031abc4a359a7b828
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu May 25 11:44:13 2023 +0300

    Lexicographical Ordering Equivalence Support (#6431)
    
    * Convert ordering equivalence to vec (unit test)
    
    * compiles
    
    * Simplifications
    
    * simplifications
    
    * Remove unnecessary codes
    
    * simplifications
    
    * Add test cases
    
    * fix bug
    
    * simplifications
    
    * Resolve linter errors
    
    * remove unnecessary codes
    
    * simplifications
    
    * simplifications
    
    * Remove unnecessary codes
    
    * Add pruning to ordering_equivalence projection
    
    * Remove unnecessary clones
    
    * Convert get range to calculate compatible ranges
    
    * Simplifications
    
    * Update comments
    
    * Update comments
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../core/src/physical_plan/aggregates/mod.rs       |   4 +-
 datafusion/core/src/physical_plan/windows/mod.rs   |  51 +-
 datafusion/physical-expr/src/equivalence.rs        | 135 +++--
 datafusion/physical-expr/src/utils.rs              | 600 +++++++++++----------
 4 files changed, 419 insertions(+), 371 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5e6c5656c8..fc722e9ac3 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -1735,8 +1735,8 @@ mod tests {
         eq_properties.add_equal_conditions((&col_a, &col_b));
         let mut ordering_eq_properties = 
OrderingEquivalenceProperties::new(test_schema);
         ordering_eq_properties.add_equal_conditions((
-            &OrderedColumn::new(col_a.clone(), options1),
-            &OrderedColumn::new(col_c.clone(), options2),
+            &vec![OrderedColumn::new(col_a.clone(), options1)],
+            &vec![OrderedColumn::new(col_c.clone(), options2)],
         ));
 
         let order_by_exprs = vec![
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index f6fe3bcaee..d4732cc1f6 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -262,7 +262,22 @@ pub(crate) fn window_ordering_equivalence(
             .iter()
             .cloned(),
     );
-    let out_ordering = input.output_ordering().unwrap_or(&[]);
+    let mut normalized_out_ordering = vec![];
+    for item in input.output_ordering().unwrap_or(&[]) {
+        // To account for ordering equivalences, first normalize the 
expression:
+        let normalized = normalize_expr_with_equivalence_properties(
+            item.expr.clone(),
+            input.equivalence_properties().classes(),
+        );
+        // Currently we only support, ordering equivalences for `Column` 
expressions.
+        // TODO: Add support for ordering equivalence for all `PhysicalExpr`s
+        if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
+            normalized_out_ordering
+                .push(OrderedColumn::new(column.clone(), item.options));
+        } else {
+            break;
+        }
+    }
     for expr in window_expr {
         if let Some(builtin_window_expr) =
             expr.as_any().downcast_ref::<BuiltInWindowExpr>()
@@ -275,28 +290,18 @@ pub(crate) fn window_ordering_equivalence(
                 .is::<RowNumber>()
             {
                 // If there is an existing ordering, add new ordering as an 
equivalence:
-                if let Some(first) = out_ordering.first() {
-                    // Normalize expression, as we search for ordering 
equivalences
-                    // on normalized versions:
-                    let normalized = 
normalize_expr_with_equivalence_properties(
-                        first.expr.clone(),
-                        input.equivalence_properties().classes(),
-                    );
-                    if let Some(column) = 
normalized.as_any().downcast_ref::<Column>() {
-                        let column_info =
-                            
schema.column_with_name(expr.field().unwrap().name());
-                        if let Some((idx, field)) = column_info {
-                            let lhs = OrderedColumn::new(column.clone(), 
first.options);
-                            let options = SortOptions {
-                                descending: false,
-                                nulls_first: false,
-                            }; // ASC, NULLS LAST
-                            let rhs = OrderedColumn::new(
-                                Column::new(field.name(), idx),
-                                options,
-                            );
-                            result.add_equal_conditions((&lhs, &rhs));
-                        }
+                if !normalized_out_ordering.is_empty() {
+                    if let Some((idx, field)) =
+                        schema.column_with_name(expr.field().unwrap().name())
+                    {
+                        let column = Column::new(field.name(), idx);
+                        let options = SortOptions {
+                            descending: false,
+                            nulls_first: false,
+                        }; // ASC, NULLS LAST
+                        let rhs = OrderedColumn::new(column, options);
+                        result
+                            .add_equal_conditions((&normalized_out_ordering, 
&vec![rhs]));
                     }
                 }
             }
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index ab3443424b..a1f2df9208 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -16,12 +16,14 @@
 // under the License.
 
 use crate::expressions::Column;
+use crate::{PhysicalSortExpr, PhysicalSortRequirement};
 
 use arrow::datatypes::SchemaRef;
 use arrow_schema::SortOptions;
 
 use std::collections::{HashMap, HashSet};
 use std::hash::Hash;
+use std::sync::Arc;
 
 /// Equivalence Properties is a vec of EquivalentClass.
 #[derive(Debug, Clone)]
@@ -117,7 +119,7 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
 /// where both `a ASC` and `b DESC` can describe the table ordering. With
 /// `OrderingEquivalenceProperties`, we can keep track of these equivalences
 /// and treat `a ASC` and `b DESC` as the same ordering requirement.
-pub type OrderingEquivalenceProperties = EquivalenceProperties<OrderedColumn>;
+pub type OrderingEquivalenceProperties = 
EquivalenceProperties<Vec<OrderedColumn>>;
 
 /// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are 
known
 /// to have the same value in all tuples in a relation. 
`EquivalentClass<Column>`
@@ -198,36 +200,61 @@ impl OrderedColumn {
     }
 }
 
-trait ColumnAccessor {
-    fn column(&self) -> &Column;
-}
-
-impl ColumnAccessor for Column {
-    fn column(&self) -> &Column {
-        self
+impl From<OrderedColumn> for PhysicalSortExpr {
+    fn from(value: OrderedColumn) -> Self {
+        PhysicalSortExpr {
+            expr: Arc::new(value.col) as _,
+            options: value.options,
+        }
     }
 }
 
-impl ColumnAccessor for OrderedColumn {
-    fn column(&self) -> &Column {
-        &self.col
+impl From<OrderedColumn> for PhysicalSortRequirement {
+    fn from(value: OrderedColumn) -> Self {
+        PhysicalSortRequirement {
+            expr: Arc::new(value.col) as _,
+            options: Some(value.options),
+        }
     }
 }
 
-pub type OrderingEquivalentClass = EquivalentClass<OrderedColumn>;
+/// `Vec<OrderedColumn>` stores the lexicographical ordering for a schema.
+/// OrderingEquivalentClass keeps track of different alternative orderings 
than can
+/// describe the schema.
+/// For instance, for the table below
+/// |a|b|c|d|
+/// |1|4|3|1|
+/// |2|3|3|2|
+/// |3|1|2|2|
+/// |3|2|1|3|
+/// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering 
of the table.
+/// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` 
are ordering equivalent.
+pub type OrderingEquivalentClass = EquivalentClass<Vec<OrderedColumn>>;
 
 impl OrderingEquivalentClass {
-    /// Finds the matching column inside the `OrderingEquivalentClass`.
-    fn get_matching_column(&self, column: &Column) -> Option<OrderedColumn> {
-        if self.head.col.eq(column) {
-            Some(self.head.clone())
-        } else {
-            for item in &self.others {
-                if item.col.eq(column) {
-                    return Some(item.clone());
+    /// This function extends ordering equivalences with alias information.
+    /// For instance, assume column a and b are aliases,
+    /// and column (a ASC), (c DESC) are ordering equivalent. We append (b 
ASC) to ordering equivalence,
+    /// since b is alias of colum a. After this function (a ASC), (c DESC), (b 
ASC) would be ordering equivalent.
+    fn update_with_aliases(&mut self, columns_map: &HashMap<Column, 
Vec<Column>>) {
+        for (column, columns) in columns_map {
+            let mut to_insert = vec![];
+            for ordering in 
std::iter::once(&self.head).chain(self.others.iter()) {
+                for (idx, item) in ordering.iter().enumerate() {
+                    if item.col.eq(column) {
+                        for col in columns {
+                            let mut normalized = self.head.clone();
+                            // Change the corresponding entry in the head with 
the alias column:
+                            let entry = &mut normalized[idx];
+                            (entry.col, entry.options) = (col.clone(), 
item.options);
+                            to_insert.push(normalized);
+                        }
+                    }
                 }
             }
-            None
+            for items in to_insert {
+                self.insert(items);
+            }
         }
     }
 }
@@ -242,10 +269,10 @@ pub fn project_equivalence_properties(
     alias_map: &HashMap<Column, Vec<Column>>,
     output_eq: &mut EquivalenceProperties,
 ) {
-    let mut ec_classes = input_eq.classes().to_vec();
+    let mut eq_classes = input_eq.classes().to_vec();
     for (column, columns) in alias_map {
         let mut find_match = false;
-        for class in ec_classes.iter_mut() {
+        for class in eq_classes.iter_mut() {
             if class.contains(column) {
                 for col in columns {
                     class.insert(col.clone());
@@ -255,12 +282,29 @@ pub fn project_equivalence_properties(
             }
         }
         if !find_match {
-            ec_classes.push(EquivalentClass::new(column.clone(), 
columns.clone()));
+            eq_classes.push(EquivalentClass::new(column.clone(), 
columns.clone()));
+        }
+    }
+
+    // Prune columns that are no longer in the schema from equivalences.
+    let schema = output_eq.schema();
+    let fields = schema.fields();
+    for class in eq_classes.iter_mut() {
+        let columns_to_remove = class
+            .iter()
+            .filter(|column| {
+                let idx = column.index();
+                idx >= fields.len() || fields[idx].name() != column.name()
+            })
+            .cloned()
+            .collect::<Vec<_>>();
+        for column in columns_to_remove {
+            class.remove(&column);
         }
     }
+    eq_classes.retain(|props| props.len() > 1);
 
-    prune_columns_to_remove(output_eq, &mut ec_classes);
-    output_eq.extend(ec_classes);
+    output_eq.extend(eq_classes);
 }
 
 /// This function applies the given projection to the given ordering
@@ -275,39 +319,22 @@ pub fn project_ordering_equivalence_properties(
     columns_map: &HashMap<Column, Vec<Column>>,
     output_eq: &mut OrderingEquivalenceProperties,
 ) {
-    let mut ec_classes = input_eq.classes().to_vec();
-    for (column, columns) in columns_map {
-        for class in ec_classes.iter_mut() {
-            if let Some(OrderedColumn { options, .. }) = 
class.get_matching_column(column)
-            {
-                for col in columns {
-                    class.insert(OrderedColumn {
-                        col: col.clone(),
-                        options,
-                    });
-                }
-                break;
-            }
-        }
+    let mut eq_classes = input_eq.classes().to_vec();
+    for class in eq_classes.iter_mut() {
+        class.update_with_aliases(columns_map);
     }
 
-    prune_columns_to_remove(output_eq, &mut ec_classes);
-    output_eq.extend(ec_classes);
-}
-
-fn prune_columns_to_remove<T: Eq + Hash + Clone + ColumnAccessor>(
-    eq_properties: &EquivalenceProperties<T>,
-    eq_classes: &mut Vec<EquivalentClass<T>>,
-) {
-    let schema = eq_properties.schema();
+    // Prune columns that no longer is in the schema from from the 
OrderingEquivalenceProperties.
+    let schema = output_eq.schema();
     let fields = schema.fields();
     for class in eq_classes.iter_mut() {
         let columns_to_remove = class
             .iter()
-            .filter(|elem| {
-                let column = elem.column();
-                let idx = column.index();
-                idx >= fields.len() || fields[idx].name() != column.name()
+            .filter(|columns| {
+                columns.iter().any(|column| {
+                    let idx = column.col.index();
+                    idx >= fields.len() || fields[idx].name() != 
column.col.name()
+                })
             })
             .cloned()
             .collect::<Vec<_>>();
@@ -316,6 +343,8 @@ fn prune_columns_to_remove<T: Eq + Hash + Clone + 
ColumnAccessor>(
         }
     }
     eq_classes.retain(|props| props.len() > 1);
+
+    output_eq.extend(eq_classes);
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index a8a0625ca0..54a18a1573 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -16,14 +16,13 @@
 // under the License.
 
 use crate::equivalence::{
-    EquivalenceProperties, EquivalentClass, OrderedColumn, 
OrderingEquivalenceProperties,
+    EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
     OrderingEquivalentClass,
 };
 use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
 use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
 
 use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
 };
@@ -35,6 +34,7 @@ use petgraph::stable_graph::StableGraph;
 use std::borrow::Borrow;
 use std::collections::HashMap;
 use std::collections::HashSet;
+use std::ops::Range;
 use std::sync::Arc;
 
 /// Compare the two expr lists are equal no matter the order.
@@ -164,70 +164,6 @@ pub fn normalize_expr_with_equivalence_properties(
         .unwrap_or(expr)
 }
 
-fn normalize_expr_with_ordering_equivalence_properties(
-    expr: Arc<dyn PhysicalExpr>,
-    sort_options: SortOptions,
-    eq_properties: &[OrderingEquivalentClass],
-) -> (Arc<dyn PhysicalExpr>, SortOptions) {
-    let normalized_expr = expr
-        .clone()
-        .transform(&|expr| {
-            let normalized_form =
-                expr.as_any().downcast_ref::<Column>().and_then(|column| {
-                    for class in eq_properties {
-                        let ordered_column = OrderedColumn {
-                            col: column.clone(),
-                            options: sort_options,
-                        };
-                        if class.contains(&ordered_column) {
-                            return Some(class.head().clone());
-                        }
-                    }
-                    None
-                });
-            Ok(if let Some(normalized_form) = normalized_form {
-                Transformed::Yes(Arc::new(normalized_form.col) as _)
-            } else {
-                Transformed::No(expr)
-            })
-        })
-        .unwrap_or_else(|_| expr.clone());
-    if expr.ne(&normalized_expr) {
-        if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
-            for eq_class in eq_properties.iter() {
-                let head = eq_class.head();
-                if head.col.eq(col) {
-                    // Use options of the normalized version:
-                    return (normalized_expr, head.options);
-                }
-            }
-        }
-    }
-    (expr, sort_options)
-}
-
-fn normalize_sort_expr_with_equivalence_properties(
-    mut sort_expr: PhysicalSortExpr,
-    eq_properties: &[EquivalentClass],
-) -> PhysicalSortExpr {
-    sort_expr.expr =
-        normalize_expr_with_equivalence_properties(sort_expr.expr, 
eq_properties);
-    sort_expr
-}
-
-fn normalize_sort_expr_with_ordering_equivalence_properties(
-    mut sort_expr: PhysicalSortExpr,
-    eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortExpr {
-    (sort_expr.expr, sort_expr.options) =
-        normalize_expr_with_ordering_equivalence_properties(
-            sort_expr.expr.clone(),
-            sort_expr.options,
-            eq_properties,
-        );
-    sort_expr
-}
-
 fn normalize_sort_requirement_with_equivalence_properties(
     mut sort_requirement: PhysicalSortRequirement,
     eq_properties: &[EquivalentClass],
@@ -237,47 +173,121 @@ fn 
normalize_sort_requirement_with_equivalence_properties(
     sort_requirement
 }
 
-fn normalize_sort_requirement_with_ordering_equivalence_properties(
-    mut sort_requirement: PhysicalSortRequirement,
-    eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortRequirement {
-    if let Some(options) = &mut sort_requirement.options {
-        (sort_requirement.expr, *options) =
-            normalize_expr_with_ordering_equivalence_properties(
-                sort_requirement.expr,
-                *options,
-                eq_properties,
-            );
+/// This function searches for the slice `section` inside the slice `given`.
+/// It returns each range where `section` is compatible with the corresponding
+/// slice in `given`.
+fn get_compatible_ranges(
+    given: &[PhysicalSortRequirement],
+    section: &[PhysicalSortRequirement],
+) -> Vec<Range<usize>> {
+    let n_section = section.len();
+    let n_end = if given.len() >= n_section {
+        given.len() - n_section + 1
+    } else {
+        0
+    };
+    (0..n_end)
+        .filter_map(|idx| {
+            let end = idx + n_section;
+            given[idx..end]
+                .iter()
+                .zip(section)
+                .all(|(req, given)| given.compatible(req))
+                .then_some(Range { start: idx, end })
+        })
+        .collect()
+}
+
+/// This function constructs a duplicate-free vector by filtering out duplicate
+/// entries inside the given vector `input`.
+fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
+    let mut output = vec![];
+    for item in input {
+        if !output.contains(&item) {
+            output.push(item);
+        }
     }
-    sort_requirement
+    output
 }
 
-pub fn normalize_sort_expr(
-    sort_expr: PhysicalSortExpr,
+/// Transform `sort_exprs` vector, to standardized version using 
`eq_properties` and `ordering_eq_properties`
+/// Assume `eq_properties` states that `Column a` and `Column b` are aliases.
+/// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]` 
and `vec![a ASC, c ASC]` are
+/// ordering equivalent (in the sense that both describe the ordering of the 
table).
+/// If the `sort_exprs` input to this function were `vec![b ASC, c ASC]`,
+/// This function converts `sort_exprs` `vec![b ASC, c ASC]` to first `vec![a 
ASC, c ASC]` after considering `eq_properties`
+/// Then converts `vec![a ASC, c ASC]` to `vec![d ASC]` after considering 
`ordering_eq_properties`.
+/// Standardized version `vec![d ASC]` is used in subsequent operations.
+pub fn normalize_sort_exprs(
+    sort_exprs: &[PhysicalSortExpr],
     eq_properties: &[EquivalentClass],
     ordering_eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortExpr {
-    let normalized =
-        normalize_sort_expr_with_equivalence_properties(sort_expr, 
eq_properties);
-    normalize_sort_expr_with_ordering_equivalence_properties(
-        normalized,
+) -> Vec<PhysicalSortExpr> {
+    let sort_requirements = 
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+    let normalized_exprs = normalize_sort_requirements(
+        &sort_requirements,
+        eq_properties,
         ordering_eq_properties,
-    )
+    );
+    let normalized_exprs = 
PhysicalSortRequirement::to_sort_exprs(normalized_exprs);
+    collapse_vec(normalized_exprs)
 }
 
-pub fn normalize_sort_requirement(
-    sort_requirement: PhysicalSortRequirement,
+/// Transform `sort_reqs` vector, to standardized version using 
`eq_properties` and `ordering_eq_properties`
+/// Assume `eq_properties` states that `Column a` and `Column b` are aliases.
+/// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]` 
and `vec![a ASC, c ASC]` are
+/// ordering equivalent (in the sense that both describe the ordering of the 
table).
+/// If the `sort_reqs` input to this function were `vec![b Some(ASC), c None]`,
+/// This function converts `sort_exprs` `vec![b Some(ASC), c None]` to first 
`vec![a Some(ASC), c None]` after considering `eq_properties`
+/// Then converts `vec![a Some(ASC), c None]` to `vec![d Some(ASC)]` after 
considering `ordering_eq_properties`.
+/// Standardized version `vec![d Some(ASC)]` is used in subsequent operations.
+pub fn normalize_sort_requirements(
+    sort_reqs: &[PhysicalSortRequirement],
     eq_properties: &[EquivalentClass],
     ordering_eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortRequirement {
-    let normalized = normalize_sort_requirement_with_equivalence_properties(
-        sort_requirement,
-        eq_properties,
-    );
-    normalize_sort_requirement_with_ordering_equivalence_properties(
-        normalized,
-        ordering_eq_properties,
-    )
+) -> Vec<PhysicalSortRequirement> {
+    let mut normalized_exprs = sort_reqs
+        .iter()
+        .map(|sort_req| {
+            normalize_sort_requirement_with_equivalence_properties(
+                sort_req.clone(),
+                eq_properties,
+            )
+        })
+        .collect::<Vec<_>>();
+    for ordering_eq_class in ordering_eq_properties {
+        for item in ordering_eq_class.others() {
+            let item = item
+                .clone()
+                .into_iter()
+                .map(|elem| elem.into())
+                .collect::<Vec<_>>();
+            let ranges = get_compatible_ranges(&normalized_exprs, &item);
+            let mut offset: i64 = 0;
+            for Range { start, end } in ranges {
+                let mut head = ordering_eq_class
+                    .head()
+                    .clone()
+                    .into_iter()
+                    .map(|elem| elem.into())
+                    .collect::<Vec<PhysicalSortRequirement>>();
+                let updated_start = (start as i64 + offset) as usize;
+                let updated_end = (end as i64 + offset) as usize;
+                let range = end - start;
+                offset += head.len() as i64 - range as i64;
+                let all_none = normalized_exprs[updated_start..updated_end]
+                    .iter()
+                    .all(|req| req.options.is_none());
+                if all_none {
+                    for req in head.iter_mut() {
+                        req.options = None;
+                    }
+                }
+                normalized_exprs.splice(updated_start..updated_end, head);
+            }
+        }
+    }
+    collapse_vec(normalized_exprs)
 }
 
 /// Checks whether given ordering requirements are satisfied by provided 
[PhysicalSortExpr]s.
@@ -317,17 +327,10 @@ pub fn ordering_satisfy_concrete<
     let ordering_eq_classes = oeq_properties.classes();
     let eq_properties = equal_properties();
     let eq_classes = eq_properties.classes();
-    let mut required_normalized = Vec::new();
-    for expr in required {
-        let item = normalize_sort_expr(expr.clone(), eq_classes, 
ordering_eq_classes);
-        if !required_normalized.contains(&item) {
-            required_normalized.push(item);
-        }
-    }
-    let provided_normalized = provided
-        .iter()
-        .map(|e| normalize_sort_expr(e.clone(), eq_classes, 
ordering_eq_classes))
-        .collect::<Vec<_>>();
+    let required_normalized =
+        normalize_sort_exprs(required, eq_classes, ordering_eq_classes);
+    let provided_normalized =
+        normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
     if required_normalized.len() > provided_normalized.len() {
         return false;
     }
@@ -375,18 +378,10 @@ pub fn ordering_satisfy_requirement_concrete<
     let ordering_eq_classes = oeq_properties.classes();
     let eq_properties = equal_properties();
     let eq_classes = eq_properties.classes();
-    let mut required_normalized = Vec::new();
-    for req in required {
-        let item =
-            normalize_sort_requirement(req.clone(), eq_classes, 
ordering_eq_classes);
-        if !required_normalized.contains(&item) {
-            required_normalized.push(item);
-        }
-    }
-    let provided_normalized = provided
-        .iter()
-        .map(|e| normalize_sort_expr(e.clone(), eq_classes, 
ordering_eq_classes))
-        .collect::<Vec<_>>();
+    let required_normalized =
+        normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+    let provided_normalized =
+        normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
     if required_normalized.len() > provided_normalized.len() {
         return false;
     }
@@ -434,18 +429,11 @@ fn requirements_compatible_concrete<
     let ordering_eq_classes = oeq_properties.classes();
     let eq_properties = equal_properties();
     let eq_classes = eq_properties.classes();
-    let mut required_normalized = Vec::new();
-    for req in required {
-        let item =
-            normalize_sort_requirement(req.clone(), eq_classes, 
ordering_eq_classes);
-        if !required_normalized.contains(&item) {
-            required_normalized.push(item);
-        }
-    }
-    let provided_normalized = provided
-        .iter()
-        .map(|e| normalize_sort_requirement(e.clone(), eq_classes, 
ordering_eq_classes))
-        .collect::<Vec<_>>();
+
+    let required_normalized =
+        normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+    let provided_normalized =
+        normalize_sort_requirements(provided, eq_classes, ordering_eq_classes);
     if required_normalized.len() > provided_normalized.len() {
         return false;
     }
@@ -708,11 +696,12 @@ pub fn reassign_predicate_columns(
 mod tests {
     use super::*;
     use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
-    use crate::PhysicalSortExpr;
+    use crate::{OrderedColumn, PhysicalSortExpr};
     use arrow::compute::SortOptions;
     use datafusion_common::{Result, ScalarValue};
     use std::fmt::{Display, Formatter};
 
+    use crate::equivalence::OrderingEquivalenceProperties;
     use arrow_schema::{DataType, Field, Schema};
     use petgraph::visit::Bfs;
     use std::sync::Arc;
@@ -774,10 +763,10 @@ mod tests {
         OrderingEquivalenceProperties,
     )> {
         // Assume schema satisfies ordering a ASC NULLS LAST
-        // and d ASC NULLS LAST and e DESC NULLS FIRST
+        // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, b 
ASC NULLS LAST
         // Assume that column a and c are aliases.
         let col_a = &Column::new("a", 0);
-        let _col_b = &Column::new("b", 1);
+        let col_b = &Column::new("b", 1);
         let col_c = &Column::new("c", 2);
         let col_d = &Column::new("d", 3);
         let col_e = &Column::new("e", 4);
@@ -795,12 +784,18 @@ mod tests {
         let mut ordering_eq_properties =
             OrderingEquivalenceProperties::new(test_schema.clone());
         ordering_eq_properties.add_equal_conditions((
-            &OrderedColumn::new(col_a.clone(), option1),
-            &OrderedColumn::new(col_d.clone(), option1),
+            &vec![OrderedColumn::new(col_a.clone(), option1)],
+            &vec![
+                OrderedColumn::new(col_d.clone(), option1),
+                OrderedColumn::new(col_b.clone(), option1),
+            ],
         ));
         ordering_eq_properties.add_equal_conditions((
-            &OrderedColumn::new(col_a.clone(), option1),
-            &OrderedColumn::new(col_e.clone(), option2),
+            &vec![OrderedColumn::new(col_a.clone(), option1)],
+            &vec![
+                OrderedColumn::new(col_e.clone(), option2),
+                OrderedColumn::new(col_b.clone(), option1),
+            ],
         ));
         Ok((test_schema, eq_properties, ordering_eq_properties))
     }
@@ -1125,10 +1120,47 @@ mod tests {
             (vec![(col_c, option1)], true),
             (vec![(col_c, option2)], false),
             // Test whether ordering equivalence works as expected
-            (vec![(col_d, option1)], true),
-            (vec![(col_d, option2)], false),
-            (vec![(col_e, option2)], true),
-            (vec![(col_e, option1)], false),
+            (vec![(col_d, option1)], false),
+            (vec![(col_d, option1), (col_b, option1)], true),
+            (vec![(col_d, option2), (col_b, option1)], false),
+            (vec![(col_e, option2), (col_b, option1)], true),
+            (vec![(col_e, option1), (col_b, option1)], false),
+            (
+                vec![
+                    (col_d, option1),
+                    (col_b, option1),
+                    (col_d, option1),
+                    (col_b, option1),
+                ],
+                true,
+            ),
+            (
+                vec![
+                    (col_d, option1),
+                    (col_b, option1),
+                    (col_e, option2),
+                    (col_b, option1),
+                ],
+                true,
+            ),
+            (
+                vec![
+                    (col_d, option1),
+                    (col_b, option1),
+                    (col_d, option2),
+                    (col_b, option1),
+                ],
+                false,
+            ),
+            (
+                vec![
+                    (col_d, option1),
+                    (col_b, option1),
+                    (col_e, option1),
+                    (col_b, option1),
+                ],
+                false,
+            ),
         ];
         for (cols, expected) in requirements {
             let err_msg = format!("Error in test case:{cols:?}");
@@ -1155,6 +1187,71 @@ mod tests {
         Ok(())
     }
 
+    fn convert_to_requirement(
+        in_data: &[(&Column, Option<SortOptions>)],
+    ) -> Vec<PhysicalSortRequirement> {
+        in_data
+            .iter()
+            .map(|(col, options)| {
+                PhysicalSortRequirement::new(Arc::new((*col).clone()) as _, 
*options)
+            })
+            .collect::<Vec<_>>()
+    }
+
+    #[test]
+    fn test_normalize_sort_reqs() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let col_b = &Column::new("b", 1);
+        let col_c = &Column::new("c", 2);
+        let col_d = &Column::new("d", 3);
+        let col_e = &Column::new("e", 4);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let option2 = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        // First element in the tuple stores vector of requirement, second 
element is the expected return value for ordering_satisfy function
+        let requirements = vec![
+            (vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
+            (vec![(col_a, None)], vec![(col_a, None)]),
+            // Test whether equivalence works as expected
+            (vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
+            (vec![(col_c, None)], vec![(col_a, None)]),
+            // Test whether ordering equivalence works as expected
+            (
+                vec![(col_d, Some(option1)), (col_b, Some(option1))],
+                vec![(col_a, Some(option1))],
+            ),
+            (vec![(col_d, None), (col_b, None)], vec![(col_a, None)]),
+            (
+                vec![(col_e, Some(option2)), (col_b, Some(option1))],
+                vec![(col_a, Some(option1))],
+            ),
+            // We should be able to normalize in compatible requirements also 
(not exactly equal)
+            (
+                vec![(col_e, Some(option2)), (col_b, None)],
+                vec![(col_a, Some(option1))],
+            ),
+            (vec![(col_e, None), (col_b, None)], vec![(col_a, None)]),
+        ];
+        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
+        let eq_classes = eq_properties.classes();
+        let ordering_eq_classes = ordering_eq_properties.classes();
+        for (reqs, expected_normalized) in requirements.into_iter() {
+            let req = convert_to_requirement(&reqs);
+            let expected_normalized = 
convert_to_requirement(&expected_normalized);
+
+            assert_eq!(
+                normalize_sort_requirements(&req, eq_classes, 
ordering_eq_classes),
+                expected_normalized
+            );
+        }
+        Ok(())
+    }
+
     #[test]
     fn test_reassign_predicate_columns_in_list() {
         let int_field = Field::new("should_not_matter", DataType::Int64, true);
@@ -1195,30 +1292,26 @@ mod tests {
     #[test]
     fn test_normalize_expr_with_equivalence() -> Result<()> {
         let col_a = &Column::new("a", 0);
-        let _col_b = &Column::new("b", 1);
+        let col_b = &Column::new("b", 1);
         let col_c = &Column::new("c", 2);
-        let col_d = &Column::new("d", 3);
-        let col_e = &Column::new("e", 4);
-        let option1 = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-        let option2 = SortOptions {
-            descending: true,
-            nulls_first: true,
-        };
-        // Assume schema satisfies ordering a ASC NULLS LAST
-        // and d ASC NULLS LAST and e DESC NULLS FIRST
+        let _col_d = &Column::new("d", 3);
+        let _col_e = &Column::new("e", 4);
         // Assume that column a and c are aliases.
-        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
+        let (_test_schema, eq_properties, _ordering_eq_properties) =
+            create_test_params()?;
 
         let col_a_expr = Arc::new(col_a.clone()) as Arc<dyn PhysicalExpr>;
+        let col_b_expr = Arc::new(col_b.clone()) as Arc<dyn PhysicalExpr>;
         let col_c_expr = Arc::new(col_c.clone()) as Arc<dyn PhysicalExpr>;
-        let col_d_expr = Arc::new(col_d.clone()) as Arc<dyn PhysicalExpr>;
-        let col_e_expr = Arc::new(col_e.clone()) as Arc<dyn PhysicalExpr>;
         // Test cases for equivalence normalization,
         // First entry in the tuple is argument, second entry is expected 
result after normalization.
-        let expressions = vec![(&col_a_expr, &col_a_expr), (&col_c_expr, 
&col_a_expr)];
+        let expressions = vec![
+            // Normalized version of the column a and c should go to a (since 
a is head)
+            (&col_a_expr, &col_a_expr),
+            (&col_c_expr, &col_a_expr),
+            // Cannot normalize column b
+            (&col_b_expr, &col_b_expr),
+        ];
         for (expr, expected_eq) in expressions {
             assert!(
                 expected_eq.eq(&normalize_expr_with_equivalence_properties(
@@ -1229,102 +1322,6 @@ mod tests {
             );
         }
 
-        // Test cases for ordering equivalence normalization
-        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
-        let expressions = vec![
-            (&col_d_expr, option1, option1, &col_a_expr),
-            (&col_e_expr, option2, option1, &col_a_expr),
-            // Cannot normalize, hence should return itself.
-            (&col_e_expr, option1, option1, &col_e_expr),
-        ];
-        for (expr, sort_options, expected_options, expected_ordering_eq) in 
expressions {
-            let (normalized_expr, options) =
-                normalize_expr_with_ordering_equivalence_properties(
-                    expr.clone(),
-                    sort_options,
-                    ordering_eq_properties.classes(),
-                );
-            assert!(
-                normalized_expr.eq(expected_ordering_eq) && (expected_options 
== options),
-                "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
-            );
-        }
-        Ok(())
-    }
-
-    #[test]
-    fn test_normalize_sort_expr_with_equivalence() -> Result<()> {
-        let col_a = &Column::new("a", 0);
-        let _col_b = &Column::new("b", 1);
-        let col_c = &Column::new("c", 2);
-        let col_d = &Column::new("d", 3);
-        let col_e = &Column::new("e", 4);
-        let option1 = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-        let option2 = SortOptions {
-            descending: true,
-            nulls_first: true,
-        };
-        // Assume schema satisfies ordering a ASC NULLS LAST
-        // and d ASC NULLS LAST and e DESC NULLS FIRST
-        // Assume that column a and c are aliases.
-        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
-
-        // Test cases for equivalence normalization
-        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
-        let expressions = vec![
-            (&col_a, option1, &col_a, option1),
-            (&col_c, option1, &col_a, option1),
-            // Cannot normalize column d, since it is not in equivalence 
properties.
-            (&col_d, option1, &col_d, option1),
-        ];
-        for (expr, sort_options, expected_col, expected_options) in
-            expressions.into_iter()
-        {
-            let expected = PhysicalSortExpr {
-                expr: Arc::new((*expected_col).clone()) as _,
-                options: expected_options,
-            };
-            let arg = PhysicalSortExpr {
-                expr: Arc::new((*expr).clone()) as _,
-                options: sort_options,
-            };
-            assert!(
-                expected.eq(&normalize_sort_expr_with_equivalence_properties(
-                    arg.clone(),
-                    eq_properties.classes()
-                )),
-                "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
-            );
-        }
-
-        // Test cases for ordering equivalence normalization
-        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
-        let expressions = vec![
-            (&col_d, option1, &col_a, option1),
-            (&col_e, option2, &col_a, option1),
-        ];
-        for (expr, sort_options, expected_col, expected_options) in
-            expressions.into_iter()
-        {
-            let expected = PhysicalSortExpr {
-                expr: Arc::new((*expected_col).clone()) as _,
-                options: expected_options,
-            };
-            let arg = PhysicalSortExpr {
-                expr: Arc::new((*expr).clone()) as _,
-                options: sort_options,
-            };
-            assert!(
-                
expected.eq(&normalize_sort_expr_with_ordering_equivalence_properties(
-                    arg.clone(),
-                    ordering_eq_properties.classes()
-                )),
-                "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
-            );
-        }
         Ok(())
     }
 
@@ -1334,19 +1331,14 @@ mod tests {
         let _col_b = &Column::new("b", 1);
         let col_c = &Column::new("c", 2);
         let col_d = &Column::new("d", 3);
-        let col_e = &Column::new("e", 4);
+        let _col_e = &Column::new("e", 4);
         let option1 = SortOptions {
             descending: false,
             nulls_first: false,
         };
-        let option2 = SortOptions {
-            descending: true,
-            nulls_first: true,
-        };
-        // Assume schema satisfies ordering a ASC NULLS LAST
-        // and d ASC NULLS LAST and e DESC NULLS FIRST
         // Assume that column a and c are aliases.
-        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
+        let (_test_schema, eq_properties, _ordering_eq_properties) =
+            create_test_params()?;
 
         // Test cases for equivalence normalization
         // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
@@ -1377,33 +1369,6 @@ mod tests {
             );
         }
 
-        // Test cases for ordering equivalence normalization
-        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
-        let expressions = vec![
-            (&col_d, Some(option1), &col_a, Some(option1)),
-            (&col_e, Some(option2), &col_a, Some(option1)),
-        ];
-        for (expr, sort_options, expected_col, expected_options) in
-            expressions.into_iter()
-        {
-            let expected = PhysicalSortRequirement::new(
-                Arc::new((*expected_col).clone()) as _,
-                expected_options,
-            );
-            let arg = PhysicalSortRequirement::new(
-                Arc::new((*expr).clone()) as _,
-                sort_options,
-            );
-            assert!(
-                expected.eq(
-                    
&normalize_sort_requirement_with_ordering_equivalence_properties(
-                        arg.clone(),
-                        ordering_eq_properties.classes()
-                    )
-                ),
-                "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
-            );
-        }
         Ok(())
     }
 
@@ -1426,8 +1391,8 @@ mod tests {
         // Column a and e are ordering equivalent (e.g global ordering of the 
table can be described both as a ASC and e ASC.)
         let mut ordering_eq_properties = 
OrderingEquivalenceProperties::new(test_schema);
         ordering_eq_properties.add_equal_conditions((
-            &OrderedColumn::new(col_a.clone(), option1),
-            &OrderedColumn::new(col_e.clone(), option1),
+            &vec![OrderedColumn::new(col_a.clone(), option1)],
+            &vec![OrderedColumn::new(col_e.clone(), option1)],
         ));
         let sort_req_a = PhysicalSortExpr {
             expr: Arc::new((col_a).clone()) as _,
@@ -1491,4 +1456,53 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_compatible_ranges() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let col_b = &Column::new("b", 1);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let test_data = vec![
+            (
+                vec![(col_a, Some(option1)), (col_b, Some(option1))],
+                vec![(col_a, Some(option1))],
+                vec![(0, 1)],
+            ),
+            (
+                vec![(col_a, None), (col_b, Some(option1))],
+                vec![(col_a, Some(option1))],
+                vec![(0, 1)],
+            ),
+            (
+                vec![
+                    (col_a, None),
+                    (col_b, Some(option1)),
+                    (col_a, Some(option1)),
+                ],
+                vec![(col_a, Some(option1))],
+                vec![(0, 1), (2, 3)],
+            ),
+        ];
+        for (searched, to_search, expected) in test_data {
+            let searched = convert_to_requirement(&searched);
+            let to_search = convert_to_requirement(&to_search);
+            let expected = expected
+                .into_iter()
+                .map(|(start, end)| Range { start, end })
+                .collect::<Vec<_>>();
+            assert_eq!(get_compatible_ranges(&searched, &to_search), expected);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_collapse_vec() -> Result<()> {
+        assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]);
+        assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]);
+        assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
+        Ok(())
+    }
 }


Reply via email to