This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c72b98e414 Enhance/Refactor Ordering Equivalence Properties (#7566)
c72b98e414 is described below

commit c72b98e41489c09d02cdb5e335c547cfdc5319c4
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Sep 18 13:23:41 2023 +0300

    Enhance/Refactor Ordering Equivalence Properties (#7566)
    
    * separate implementation of oeq properties
    
    * Simplifications
    
    * Move utils to methods
    
    * Remove unnecesary code
    
    * Address todo
    
    * Buggy is_aggressive mod eklenecek
    
    * start implementing aggressive mode
    
    * all tests pass
    
    * minor changes
    
    * All tests pass
    
    * Minor changes
    
    * All tests pass
    
    * minor changes
    
    * all tests pass
    
    * Simplifications
    
    * minor changes
    
    * Resolve linter error
    
    * Minor changes
    
    * minor changes
    
    * Update plan
    
    * Simplifications, update comments
    
    * Update comments, Use existing stats to find constants
    
    * Simplifications
    
    * Unknown input stats are handled
    
    * Address reviews
    
    * Simplifications
    
    * Simplifications
    
    * Address reviews
    
    * Fix subdirectories
    
    ---------
    
    Co-authored-by: berkaysynnada <[email protected]>
---
 datafusion/common/src/stats.rs                     |  24 +
 .../src/physical_optimizer/enforce_distribution.rs |  24 +-
 datafusion/physical-expr/src/analysis.rs           |   5 +-
 datafusion/physical-expr/src/equivalence.rs        | 604 +++++++++++++++++++--
 datafusion/physical-expr/src/lib.rs                |  10 +-
 datafusion/physical-expr/src/partitioning.rs       |  19 +-
 datafusion/physical-expr/src/utils.rs              | 446 +++++----------
 datafusion/physical-plan/src/filter.rs             |  26 +-
 datafusion/physical-plan/src/joins/utils.rs        | 228 +++-----
 datafusion/physical-plan/src/memory.rs             |   6 +-
 datafusion/physical-plan/src/projection.rs         |   6 +-
 datafusion/sqllogictest/test_files/select.slt      | 106 ++++
 datafusion/sqllogictest/test_files/subquery.slt    |  25 +-
 datafusion/sqllogictest/test_files/window.slt      |   9 +-
 14 files changed, 952 insertions(+), 586 deletions(-)

diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index db788efef7..ca76e14cb8 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -19,6 +19,8 @@
 
 use std::fmt::Display;
 
+use arrow::datatypes::DataType;
+
 use crate::ScalarValue;
 
 /// Statistics for a relation
@@ -70,3 +72,25 @@ pub struct ColumnStatistics {
     /// Number of distinct values
     pub distinct_count: Option<usize>,
 }
+
+impl ColumnStatistics {
+    /// Column contains a single non null value (e.g constant).
+    pub fn is_singleton(&self) -> bool {
+        match (&self.min_value, &self.max_value) {
+            // Min and max values are the same and not infinity.
+            (Some(min), Some(max)) => !min.is_null() && !max.is_null() && (min 
== max),
+            (_, _) => false,
+        }
+    }
+
+    /// Returns the [`ColumnStatistics`] corresponding to the given datatype 
by assigning infinite bounds.
+    pub fn new_with_unbounded_column(dt: &DataType) -> ColumnStatistics {
+        let null = ScalarValue::try_from(dt.clone()).ok();
+        ColumnStatistics {
+            null_count: None,
+            max_value: null.clone(),
+            min_value: null,
+            distinct_count: None,
+        }
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 77d6e7d712..565f76affa 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -54,8 +54,7 @@ use datafusion_physical_expr::utils::{
     map_columns_before_projection, ordering_satisfy_requirement_concrete,
 };
 use datafusion_physical_expr::{
-    expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, 
PhysicalExpr,
-    PhysicalSortRequirement,
+    expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
 };
 
 use datafusion_common::internal_err;
@@ -807,36 +806,21 @@ fn try_reorder(
     } else if !equivalence_properties.classes().is_empty() {
         normalized_expected = expected
             .iter()
-            .map(|e| {
-                normalize_expr_with_equivalence_properties(
-                    e.clone(),
-                    equivalence_properties.classes(),
-                )
-            })
+            .map(|e| equivalence_properties.normalize_expr(e.clone()))
             .collect::<Vec<_>>();
         assert_eq!(normalized_expected.len(), expected.len());
 
         normalized_left_keys = join_keys
             .left_keys
             .iter()
-            .map(|e| {
-                normalize_expr_with_equivalence_properties(
-                    e.clone(),
-                    equivalence_properties.classes(),
-                )
-            })
+            .map(|e| equivalence_properties.normalize_expr(e.clone()))
             .collect::<Vec<_>>();
         assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());
 
         normalized_right_keys = join_keys
             .right_keys
             .iter()
-            .map(|e| {
-                normalize_expr_with_equivalence_properties(
-                    e.clone(),
-                    equivalence_properties.classes(),
-                )
-            })
+            .map(|e| equivalence_properties.normalize_expr(e.clone()))
             .collect::<Vec<_>>();
         assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());
 
diff --git a/datafusion/physical-expr/src/analysis.rs 
b/datafusion/physical-expr/src/analysis.rs
index d3fcdc11ad..990c643c6b 100644
--- a/datafusion/physical-expr/src/analysis.rs
+++ b/datafusion/physical-expr/src/analysis.rs
@@ -189,12 +189,15 @@ fn shrink_boundaries(
     })?;
     let final_result = graph.get_interval(*root_index);
 
+    // If during selectivity calculation we encounter an error, use 1.0 as 
cardinality estimate
+    // safest estimate(e.q largest possible value).
     let selectivity = calculate_selectivity(
         &final_result.lower.value,
         &final_result.upper.value,
         &target_boundaries,
         &initial_boundaries,
-    )?;
+    )
+    .unwrap_or(1.0);
 
     if !(0.0..=1.0).contains(&selectivity) {
         return internal_err!("Selectivity is out of limit: {}", selectivity);
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index b8ca1acc1c..369c139aa3 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,34 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::expressions::Column;
-use crate::utils::collect_columns;
+use crate::expressions::{CastExpr, Column};
+use crate::utils::{collect_columns, merge_vectors};
 use crate::{
-    normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
-    PhysicalSortExpr,
+    LexOrdering, LexOrderingRef, LexOrderingReq, PhysicalExpr, 
PhysicalSortExpr,
+    PhysicalSortRequirement,
 };
 
 use arrow::datatypes::SchemaRef;
 use arrow_schema::Fields;
 
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::Result;
+use itertools::izip;
 use std::collections::{HashMap, HashSet};
 use std::hash::Hash;
+use std::ops::Range;
 use std::sync::Arc;
 
 /// Represents a collection of [`EquivalentClass`] (equivalences
 /// between columns in relations)
 ///
-/// This is used to represent both:
+/// This is used to represent:
 ///
 /// 1. Equality conditions (like `A=B`), when `T` = [`Column`]
-/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`PhysicalSortExpr`]
 #[derive(Debug, Clone)]
-pub struct EquivalenceProperties<T = Column> {
-    classes: Vec<EquivalentClass<T>>,
+pub struct EquivalenceProperties {
+    classes: Vec<EquivalentClass<Column>>,
     schema: SchemaRef,
 }
 
-impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
+impl EquivalenceProperties {
     pub fn new(schema: SchemaRef) -> Self {
         EquivalenceProperties {
             classes: vec![],
@@ -51,7 +54,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
     }
 
     /// return the set of equivalences
-    pub fn classes(&self) -> &[EquivalentClass<T>] {
+    pub fn classes(&self) -> &[EquivalentClass<Column>] {
         &self.classes
     }
 
@@ -60,7 +63,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
     }
 
     /// Add the [`EquivalentClass`] from `iter` to this list
-    pub fn extend<I: IntoIterator<Item = EquivalentClass<T>>>(&mut self, iter: 
I) {
+    pub fn extend<I: IntoIterator<Item = EquivalentClass<Column>>>(&mut self, 
iter: I) {
         for ec in iter {
             self.classes.push(ec)
         }
@@ -68,7 +71,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
 
     /// Adds new equal conditions into the EquivalenceProperties. New equal
     /// conditions usually come from equality predicates in a join/filter.
-    pub fn add_equal_conditions(&mut self, new_conditions: (&T, &T)) {
+    pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) 
{
         let mut idx1: Option<usize> = None;
         let mut idx2: Option<usize> = None;
         for (idx, class) in self.classes.iter_mut().enumerate() {
@@ -106,7 +109,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
             }
             (None, None) => {
                 // adding new pairs
-                self.classes.push(EquivalentClass::<T>::new(
+                self.classes.push(EquivalentClass::<Column>::new(
                     new_conditions.0.clone(),
                     vec![new_conditions.1.clone()],
                 ));
@@ -114,6 +117,81 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
             _ => {}
         }
     }
+
+    /// Normalizes physical expression according to `EquivalentClass`es inside 
`self.classes`.
+    /// expression is replaced with `EquivalentClass::head` expression if it 
is among `EquivalentClass::others`.
+    pub fn normalize_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn 
PhysicalExpr> {
+        expr.clone()
+            .transform(&|expr| {
+                let normalized_form =
+                    expr.as_any().downcast_ref::<Column>().and_then(|column| {
+                        for class in &self.classes {
+                            if class.contains(column) {
+                                return Some(Arc::new(class.head().clone()) as 
_);
+                            }
+                        }
+                        None
+                    });
+                Ok(if let Some(normalized_form) = normalized_form {
+                    Transformed::Yes(normalized_form)
+                } else {
+                    Transformed::No(expr)
+                })
+            })
+            .unwrap_or(expr)
+    }
+
+    /// This function applies the \[`normalize_expr`]
+    /// function for all expression in `exprs` and returns a vector of
+    /// normalized physical expressions.
+    pub fn normalize_exprs(
+        &self,
+        exprs: &[Arc<dyn PhysicalExpr>],
+    ) -> Vec<Arc<dyn PhysicalExpr>> {
+        exprs
+            .iter()
+            .map(|expr| self.normalize_expr(expr.clone()))
+            .collect::<Vec<_>>()
+    }
+
+    /// This function normalizes `sort_requirement` according to 
`EquivalenceClasses` in the `self`.
+    /// If the given sort requirement doesn't belong to equivalence set inside
+    /// `self`, it returns `sort_requirement` as is.
+    pub fn normalize_sort_requirement(
+        &self,
+        mut sort_requirement: PhysicalSortRequirement,
+    ) -> PhysicalSortRequirement {
+        sort_requirement.expr = self.normalize_expr(sort_requirement.expr);
+        sort_requirement
+    }
+
+    /// This function applies the \[`normalize_sort_requirement`]
+    /// function for all sort requirements in `sort_reqs` and returns a vector 
of
+    /// normalized sort expressions.
+    pub fn normalize_sort_requirements(
+        &self,
+        sort_reqs: &[PhysicalSortRequirement],
+    ) -> Vec<PhysicalSortRequirement> {
+        let normalized_sort_reqs = sort_reqs
+            .iter()
+            .map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
+            .collect::<Vec<_>>();
+        collapse_vec(normalized_sort_reqs)
+    }
+
+    /// Similar to the \[`normalize_sort_requirements`] this function 
normalizes
+    /// sort expressions in `sort_exprs` and returns a vector of
+    /// normalized sort expressions.
+    pub fn normalize_sort_exprs(
+        &self,
+        sort_exprs: &[PhysicalSortExpr],
+    ) -> Vec<PhysicalSortExpr> {
+        let sort_requirements =
+            PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+        let normalized_sort_requirement =
+            self.normalize_sort_requirements(&sort_requirements);
+        PhysicalSortRequirement::to_sort_exprs(normalized_sort_requirement)
+    }
 }
 
 /// `OrderingEquivalenceProperties` keeps track of columns that describe the
@@ -131,17 +209,120 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
 /// where both `a ASC` and `b DESC` can describe the table ordering. With
 /// `OrderingEquivalenceProperties`, we can keep track of these equivalences
 /// and treat `a ASC` and `b DESC` as the same ordering requirement.
-pub type OrderingEquivalenceProperties = EquivalenceProperties<LexOrdering>;
+#[derive(Debug, Clone)]
+pub struct OrderingEquivalenceProperties {
+    oeq_class: Option<OrderingEquivalentClass>,
+    /// Keeps track of expressions that have constant value.
+    constants: Vec<Arc<dyn PhysicalExpr>>,
+    schema: SchemaRef,
+}
 
 impl OrderingEquivalenceProperties {
+    /// Create an empty `OrderingEquivalenceProperties`
+    pub fn new(schema: SchemaRef) -> Self {
+        Self {
+            oeq_class: None,
+            constants: vec![],
+            schema,
+        }
+    }
+
+    /// Extends `OrderingEquivalenceProperties` by adding ordering inside the 
`other`
+    /// to the `self.oeq_class`.
+    pub fn extend(&mut self, other: Option<OrderingEquivalentClass>) {
+        if let Some(other) = other {
+            if let Some(class) = &mut self.oeq_class {
+                class.others.insert(other.head);
+                class.others.extend(other.others);
+            } else {
+                self.oeq_class = Some(other);
+            }
+        }
+    }
+
+    pub fn oeq_class(&self) -> Option<&OrderingEquivalentClass> {
+        self.oeq_class.as_ref()
+    }
+
+    /// Adds new equal conditions into the EquivalenceProperties. New equal
+    /// conditions usually come from equality predicates in a join/filter.
+    pub fn add_equal_conditions(&mut self, new_conditions: (&LexOrdering, 
&LexOrdering)) {
+        if let Some(class) = &mut self.oeq_class {
+            class.insert(new_conditions.0.clone());
+            class.insert(new_conditions.1.clone());
+        } else {
+            let head = new_conditions.0.clone();
+            let others = vec![new_conditions.1.clone()];
+            self.oeq_class = Some(OrderingEquivalentClass::new(head, others))
+        }
+    }
+
+    /// Add physical expression that have constant value to the 
`self.constants`
+    pub fn with_constants(mut self, constants: Vec<Arc<dyn PhysicalExpr>>) -> 
Self {
+        constants.into_iter().for_each(|constant| {
+            if !physical_exprs_contains(&self.constants, &constant) {
+                self.constants.push(constant);
+            }
+        });
+        self
+    }
+
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// This function normalizes `sort_reqs` by
+    /// - removing expressions that have constant value from requirement
+    /// - replacing sections that are in the `self.oeq_class.others` with 
`self.oeq_class.head`
+    /// - removing sections that satisfies global ordering that are in the 
post fix of requirement
+    pub fn normalize_sort_requirements(
+        &self,
+        sort_reqs: &[PhysicalSortRequirement],
+    ) -> Vec<PhysicalSortRequirement> {
+        let normalized_sort_reqs =
+            prune_sort_reqs_with_constants(sort_reqs, &self.constants);
+        let mut normalized_sort_reqs = collapse_lex_req(normalized_sort_reqs);
+        if let Some(oeq_class) = &self.oeq_class {
+            for item in oeq_class.others() {
+                let item = PhysicalSortRequirement::from_sort_exprs(item);
+                let item = prune_sort_reqs_with_constants(&item, 
&self.constants);
+                let ranges = get_compatible_ranges(&normalized_sort_reqs, 
&item);
+                let mut offset: i64 = 0;
+                for Range { start, end } in ranges {
+                    let head = 
PhysicalSortRequirement::from_sort_exprs(oeq_class.head());
+                    let mut head = prune_sort_reqs_with_constants(&head, 
&self.constants);
+                    let updated_start = (start as i64 + offset) as usize;
+                    let updated_end = (end as i64 + offset) as usize;
+                    let range = end - start;
+                    offset += head.len() as i64 - range as i64;
+                    let all_none = 
normalized_sort_reqs[updated_start..updated_end]
+                        .iter()
+                        .all(|req| req.options.is_none());
+                    if all_none {
+                        for req in head.iter_mut() {
+                            req.options = None;
+                        }
+                    }
+                    normalized_sort_reqs.splice(updated_start..updated_end, 
head);
+                }
+            }
+            normalized_sort_reqs = simplify_lex_req(normalized_sort_reqs, 
oeq_class);
+        }
+        collapse_lex_req(normalized_sort_reqs)
+    }
+
     /// Checks whether `leading_ordering` is contained in any of the ordering
     /// equivalence classes.
     pub fn satisfies_leading_ordering(
         &self,
         leading_ordering: &PhysicalSortExpr,
     ) -> bool {
-        for cls in &self.classes {
-            for ordering in 
cls.others.iter().chain(std::iter::once(&cls.head)) {
+        if let Some(oeq_class) = &self.oeq_class {
+            for ordering in oeq_class
+                .others
+                .iter()
+                .chain(std::iter::once(&oeq_class.head))
+            {
                 if ordering[0].eq(leading_ordering) {
                     return true;
                 }
@@ -280,6 +461,55 @@ impl OrderingEquivalentClass {
             self.insert(update_with_alias(ordering, oeq_alias_map));
         }
     }
+
+    /// Adds `offset` value to the index of each expression inside `self.head` 
and `self.others`.
+    pub fn add_offset(&self, offset: usize) -> Result<OrderingEquivalentClass> 
{
+        let head = add_offset_to_lex_ordering(self.head(), offset)?;
+        let others = self
+            .others()
+            .iter()
+            .map(|ordering| add_offset_to_lex_ordering(ordering, offset))
+            .collect::<Result<Vec<_>>>()?;
+        Ok(OrderingEquivalentClass::new(head, others))
+    }
+
+    /// This function normalizes `OrderingEquivalenceProperties` according to 
`eq_properties`.
+    /// More explicitly, it makes sure that expressions in `oeq_class` are 
head entries
+    /// in `eq_properties`, replacing any non-head entries with head entries 
if necessary.
+    pub fn normalize_with_equivalence_properties(
+        &self,
+        eq_properties: &EquivalenceProperties,
+    ) -> OrderingEquivalentClass {
+        let head = eq_properties.normalize_sort_exprs(self.head());
+
+        let others = self
+            .others()
+            .iter()
+            .map(|other| eq_properties.normalize_sort_exprs(other))
+            .collect();
+
+        EquivalentClass::new(head, others)
+    }
+
+    /// Prefix with existing ordering.
+    pub fn prefix_ordering_equivalent_class_with_existing_ordering(
+        &self,
+        existing_ordering: &[PhysicalSortExpr],
+        eq_properties: &EquivalenceProperties,
+    ) -> OrderingEquivalentClass {
+        let existing_ordering = 
eq_properties.normalize_sort_exprs(existing_ordering);
+        let normalized_head = eq_properties.normalize_sort_exprs(self.head());
+        let updated_head = merge_vectors(&existing_ordering, &normalized_head);
+        let updated_others = self
+            .others()
+            .iter()
+            .map(|ordering| {
+                let normalized_ordering = 
eq_properties.normalize_sort_exprs(ordering);
+                merge_vectors(&existing_ordering, &normalized_ordering)
+            })
+            .collect();
+        OrderingEquivalentClass::new(updated_head, updated_others)
+    }
 }
 
 /// This is a builder object facilitating incremental construction
@@ -308,7 +538,7 @@ impl OrderingEquivalenceBuilder {
         new_ordering_eq_properties: OrderingEquivalenceProperties,
     ) -> Self {
         self.ordering_eq_properties
-            .extend(new_ordering_eq_properties.classes().iter().cloned());
+            .extend(new_ordering_eq_properties.oeq_class().cloned());
         self
     }
 
@@ -334,10 +564,7 @@ impl OrderingEquivalenceBuilder {
         let mut normalized_out_ordering = vec![];
         for item in &self.existing_ordering {
             // To account for ordering equivalences, first normalize the 
expression:
-            let normalized = normalize_expr_with_equivalence_properties(
-                item.expr.clone(),
-                self.eq_properties.classes(),
-            );
+            let normalized = 
self.eq_properties.normalize_expr(item.expr.clone());
             normalized_out_ordering.push(PhysicalSortExpr {
                 expr: normalized,
                 options: item.options,
@@ -459,40 +686,77 @@ pub fn project_ordering_equivalence_properties(
     let schema = output_eq.schema();
     let fields = schema.fields();
 
-    let mut eq_classes = input_eq.classes().to_vec();
+    let oeq_class = input_eq.oeq_class();
+    let mut oeq_class = if let Some(oeq_class) = oeq_class {
+        oeq_class.clone()
+    } else {
+        return;
+    };
     let mut oeq_alias_map = vec![];
     for (column, columns) in columns_map {
         if is_column_invalid_in_new_schema(column, fields) {
             oeq_alias_map.push((column.clone(), columns[0].clone()));
         }
     }
-    for class in eq_classes.iter_mut() {
-        class.update_with_aliases(&oeq_alias_map, fields);
-    }
+    oeq_class.update_with_aliases(&oeq_alias_map, fields);
 
-    // Prune columns that are no longer in the schema from the 
OrderingEquivalenceProperties.
-    for class in eq_classes.iter_mut() {
-        let sort_exprs_to_remove = class
-            .iter()
-            .filter(|sort_exprs| {
-                sort_exprs.iter().any(|sort_expr| {
-                    let cols_in_expr = collect_columns(&sort_expr.expr);
-                    // If any one of the columns, used in Expression is 
invalid, remove expression
-                    // from ordering equivalences
-                    cols_in_expr
-                        .iter()
-                        .any(|col| is_column_invalid_in_new_schema(col, 
fields))
-                })
+    // Prune columns that no longer is in the schema from from the 
OrderingEquivalenceProperties.
+    let sort_exprs_to_remove = oeq_class
+        .iter()
+        .filter(|sort_exprs| {
+            sort_exprs.iter().any(|sort_expr| {
+                let cols_in_expr = collect_columns(&sort_expr.expr);
+                // If any one of the columns, used in Expression is invalid, 
remove expression
+                // from ordering equivalences
+                cols_in_expr
+                    .iter()
+                    .any(|col| is_column_invalid_in_new_schema(col, fields))
             })
-            .cloned()
-            .collect::<Vec<_>>();
-        for sort_exprs in sort_exprs_to_remove {
-            class.remove(&sort_exprs);
+        })
+        .cloned()
+        .collect::<Vec<_>>();
+    for sort_exprs in sort_exprs_to_remove {
+        oeq_class.remove(&sort_exprs);
+    }
+    if oeq_class.len() > 1 {
+        output_eq.extend(Some(oeq_class));
+    }
+}
+
+/// Update `ordering` if it contains cast expression with target column
+/// after projection, if there is no cast expression among `ordering` 
expressions,
+/// returns `None`.
+fn update_with_cast_exprs(
+    cast_exprs: &[(CastExpr, Column)],
+    mut ordering: LexOrdering,
+) -> Option<LexOrdering> {
+    let mut is_changed = false;
+    for sort_expr in ordering.iter_mut() {
+        for (cast_expr, target_col) in cast_exprs.iter() {
+            if sort_expr.expr.eq(cast_expr.expr()) {
+                sort_expr.expr = Arc::new(target_col.clone()) as _;
+                is_changed = true;
+            }
         }
     }
-    eq_classes.retain(|props| props.len() > 1);
+    is_changed.then_some(ordering)
+}
 
-    output_eq.extend(eq_classes);
+/// Update cast expressions inside ordering equivalence
+/// properties with its target column after projection
+pub fn update_ordering_equivalence_with_cast(
+    cast_exprs: &[(CastExpr, Column)],
+    input_oeq: &mut OrderingEquivalenceProperties,
+) {
+    if let Some(cls) = &mut input_oeq.oeq_class {
+        for ordering in
+            
std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter())
+        {
+            if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs, 
ordering) {
+                cls.insert(updated_ordering);
+            }
+        }
+    }
 }
 
 /// Retrieves the ordering equivalence properties for a given schema and 
output ordering.
@@ -516,6 +780,197 @@ pub fn ordering_equivalence_properties_helper(
     oep
 }
 
+/// This function constructs a duplicate-free vector by filtering out duplicate
+/// entries inside the given vector `input`.
+fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
+    let mut output = vec![];
+    for item in input {
+        if !output.contains(&item) {
+            output.push(item);
+        }
+    }
+    output
+}
+
+/// This function constructs a duplicate-free `LexOrderingReq` by filtering 
out duplicate
+/// entries that have same physical expression inside the given vector `input`.
+/// `vec![a Some(Asc), a Some(Desc)]` is collapsed to the `vec![a Some(Asc)]`. 
Since
+/// when same expression is already seen before, following expressions are 
redundant.
+fn collapse_lex_req(input: LexOrderingReq) -> LexOrderingReq {
+    let mut output = vec![];
+    for item in input {
+        if !lex_req_contains(&output, &item) {
+            output.push(item);
+        }
+    }
+    output
+}
+
+/// Check whether `sort_req.expr` is among the expressions of `lex_req`.
+fn lex_req_contains(
+    lex_req: &[PhysicalSortRequirement],
+    sort_req: &PhysicalSortRequirement,
+) -> bool {
+    for constant in lex_req {
+        if constant.expr.eq(&sort_req.expr) {
+            return true;
+        }
+    }
+    false
+}
+
+/// This function simplifies lexicographical ordering requirement
+/// inside `input` by removing postfix lexicographical requirements
+/// that satisfy global ordering (occurs inside the ordering equivalent class)
+fn simplify_lex_req(
+    input: LexOrderingReq,
+    oeq_class: &OrderingEquivalentClass,
+) -> LexOrderingReq {
+    let mut section = &input[..];
+    loop {
+        let n_prune = prune_last_n_that_is_in_oeq(section, oeq_class);
+        // Cannot prune entries from the end of requirement
+        if n_prune == 0 {
+            break;
+        }
+        section = &section[0..section.len() - n_prune];
+    }
+    if section.is_empty() {
+        PhysicalSortRequirement::from_sort_exprs(oeq_class.head())
+    } else {
+        section.to_vec()
+    }
+}
+
+/// Determines how many entries from the end can be deleted.
+/// Last n entry satisfies global ordering, hence having them
+/// as postfix in the lexicographical requirement is unnecessary.
+/// Assume requirement is [a ASC, b ASC, c ASC], also assume that
+/// existing ordering is [c ASC, d ASC]. In this case, since [c ASC]
+/// is satisfied by the existing ordering (e.g corresponding section is global 
ordering),
+/// [c ASC] can be pruned from the requirement: [a ASC, b ASC, c ASC]. In this 
case,
+/// this function will return 1, to indicate last element can be removed from 
the requirement
+fn prune_last_n_that_is_in_oeq(
+    input: &[PhysicalSortRequirement],
+    oeq_class: &OrderingEquivalentClass,
+) -> usize {
+    let input_len = input.len();
+    for ordering in 
std::iter::once(oeq_class.head()).chain(oeq_class.others().iter()) {
+        let mut search_range = std::cmp::min(ordering.len(), input_len);
+        while search_range > 0 {
+            let req_section = &input[input_len - search_range..];
+            // let given_section = &ordering[0..search_range];
+            if req_satisfied(ordering, req_section) {
+                return search_range;
+            } else {
+                search_range -= 1;
+            }
+        }
+    }
+    0
+}
+
+/// Checks whether given section satisfies req.
+fn req_satisfied(given: LexOrderingRef, req: &[PhysicalSortRequirement]) -> 
bool {
+    for (given, req) in izip!(given.iter(), req.iter()) {
+        let PhysicalSortRequirement { expr, options } = req;
+        if let Some(options) = options {
+            if options != &given.options || !expr.eq(&given.expr) {
+                return false;
+            }
+        } else if !expr.eq(&given.expr) {
+            return false;
+        }
+    }
+    true
+}
+
+/// This function searches for the slice `section` inside the slice `given`.
+/// It returns each range where `section` is compatible with the corresponding
+/// slice in `given`.
+fn get_compatible_ranges(
+    given: &[PhysicalSortRequirement],
+    section: &[PhysicalSortRequirement],
+) -> Vec<Range<usize>> {
+    let n_section = section.len();
+    let n_end = if given.len() >= n_section {
+        given.len() - n_section + 1
+    } else {
+        0
+    };
+    (0..n_end)
+        .filter_map(|idx| {
+            let end = idx + n_section;
+            given[idx..end]
+                .iter()
+                .zip(section)
+                .all(|(req, given)| given.compatible(req))
+                .then_some(Range { start: idx, end })
+        })
+        .collect()
+}
+
+/// It is similar to contains method of vector.
+/// Finds whether `expr` is among `physical_exprs`.
+pub fn physical_exprs_contains(
+    physical_exprs: &[Arc<dyn PhysicalExpr>],
+    expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+    physical_exprs
+        .iter()
+        .any(|physical_expr| physical_expr.eq(expr))
+}
+
+/// Remove ordering requirements that have constant value
+fn prune_sort_reqs_with_constants(
+    ordering: &[PhysicalSortRequirement],
+    constants: &[Arc<dyn PhysicalExpr>],
+) -> Vec<PhysicalSortRequirement> {
+    ordering
+        .iter()
+        .filter(|&order| !physical_exprs_contains(constants, &order.expr))
+        .cloned()
+        .collect()
+}
+
+/// Adds the `offset` value to `Column` indices inside `expr`. This function is
+/// generally used during the update of the right table schema in join 
operations.
+pub(crate) fn add_offset_to_expr(
+    expr: Arc<dyn PhysicalExpr>,
+    offset: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+        Some(col) => Ok(Transformed::Yes(Arc::new(Column::new(
+            col.name(),
+            offset + col.index(),
+        )))),
+        None => Ok(Transformed::No(e)),
+    })
+}
+
+/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`.
+pub(crate) fn add_offset_to_sort_expr(
+    sort_expr: &PhysicalSortExpr,
+    offset: usize,
+) -> Result<PhysicalSortExpr> {
+    Ok(PhysicalSortExpr {
+        expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?,
+        options: sort_expr.options,
+    })
+}
+
+/// Adds the `offset` value to `Column` indices for each `sort_expr.expr`
+/// inside `sort_exprs`.
+pub fn add_offset_to_lex_ordering(
+    sort_exprs: LexOrderingRef,
+    offset: usize,
+) -> Result<LexOrdering> {
+    sort_exprs
+        .iter()
+        .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset))
+        .collect()
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -523,8 +978,20 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::Result;
 
+    use arrow_schema::SortOptions;
     use std::sync::Arc;
 
+    fn convert_to_requirement(
+        in_data: &[(&Column, Option<SortOptions>)],
+    ) -> Vec<PhysicalSortRequirement> {
+        in_data
+            .iter()
+            .map(|(col, options)| {
+                PhysicalSortRequirement::new(Arc::new((*col).clone()) as _, 
*options)
+            })
+            .collect::<Vec<_>>()
+    }
+
     #[test]
     fn add_equal_conditions_test() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
@@ -615,4 +1082,53 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_collapse_vec() -> Result<()> {
+        assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]);
+        assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]);
+        assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
+        Ok(())
+    }
+
+    #[test]
+    fn test_get_compatible_ranges() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let col_b = &Column::new("b", 1);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let test_data = vec![
+            (
+                vec![(col_a, Some(option1)), (col_b, Some(option1))],
+                vec![(col_a, Some(option1))],
+                vec![(0, 1)],
+            ),
+            (
+                vec![(col_a, None), (col_b, Some(option1))],
+                vec![(col_a, Some(option1))],
+                vec![(0, 1)],
+            ),
+            (
+                vec![
+                    (col_a, None),
+                    (col_b, Some(option1)),
+                    (col_a, Some(option1)),
+                ],
+                vec![(col_a, Some(option1))],
+                vec![(0, 1), (2, 3)],
+            ),
+        ];
+        for (searched, to_search, expected) in test_data {
+            let searched = convert_to_requirement(&searched);
+            let to_search = convert_to_requirement(&to_search);
+            let expected = expected
+                .into_iter()
+                .map(|(start, end)| Range { start, end })
+                .collect::<Vec<_>>();
+            assert_eq!(get_compatible_ranges(&searched, &to_search), expected);
+        }
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 85081c24c3..e83dee2e6c 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -55,9 +55,10 @@ pub use aggregate::groups_accumulator::{
 pub use aggregate::AggregateExpr;
 pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
 pub use equivalence::{
-    ordering_equivalence_properties_helper, project_equivalence_properties,
-    project_ordering_equivalence_properties, EquivalenceProperties, 
EquivalentClass,
-    OrderingEquivalenceProperties, OrderingEquivalentClass,
+    add_offset_to_lex_ordering, ordering_equivalence_properties_helper,
+    project_equivalence_properties, project_ordering_equivalence_properties,
+    EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
+    OrderingEquivalentClass,
 };
 
 pub use partitioning::{Distribution, Partitioning};
@@ -70,7 +71,6 @@ pub use sort_expr::{
 };
 pub use sort_properties::update_ordering;
 pub use utils::{
-    expr_list_eq_any_order, expr_list_eq_strict_order, find_orderings_of_exprs,
-    normalize_expr_with_equivalence_properties, 
normalize_ordering_equivalence_classes,
+    expr_list_eq_any_order, expr_list_eq_strict_order,
     normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction,
 };
diff --git a/datafusion/physical-expr/src/partitioning.rs 
b/datafusion/physical-expr/src/partitioning.rs
index 76567c8050..773eac40dc 100644
--- a/datafusion/physical-expr/src/partitioning.rs
+++ b/datafusion/physical-expr/src/partitioning.rs
@@ -20,10 +20,7 @@
 use std::fmt;
 use std::sync::Arc;
 
-use crate::{
-    expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
-    EquivalenceProperties, PhysicalExpr,
-};
+use crate::{expr_list_eq_strict_order, EquivalenceProperties, PhysicalExpr};
 
 /// Partitioning schemes supported by operators.
 #[derive(Debug, Clone)]
@@ -90,21 +87,11 @@ impl Partitioning {
                             if !eq_classes.is_empty() {
                                 let normalized_required_exprs = required_exprs
                                     .iter()
-                                    .map(|e| {
-                                        
normalize_expr_with_equivalence_properties(
-                                            e.clone(),
-                                            eq_classes,
-                                        )
-                                    })
+                                    .map(|e| 
eq_properties.normalize_expr(e.clone()))
                                     .collect::<Vec<_>>();
                                 let normalized_partition_exprs = 
partition_exprs
                                     .iter()
-                                    .map(|e| {
-                                        
normalize_expr_with_equivalence_properties(
-                                            e.clone(),
-                                            eq_classes,
-                                        )
-                                    })
+                                    .map(|e| 
eq_properties.normalize_expr(e.clone()))
                                     .collect::<Vec<_>>();
                                 expr_list_eq_strict_order(
                                     &normalized_required_exprs,
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index 2d3a395728..b2a6bb5ca6 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,21 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::borrow::Borrow;
-use std::collections::{HashMap, HashSet};
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::equivalence::{
-    EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
-    OrderingEquivalentClass,
-};
+use crate::equivalence::{EquivalenceProperties, OrderingEquivalenceProperties};
 use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
 use crate::sort_properties::{ExprOrdering, SortProperties};
 use crate::update_ordering;
-use crate::{
-    LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement,
-};
+use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
 
 use arrow::array::{make_array, Array, ArrayRef, BooleanArray, 
MutableArrayData};
 use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
@@ -42,8 +32,13 @@ use datafusion_common::utils::longest_consecutive_prefix;
 use datafusion_common::Result;
 use datafusion_expr::Operator;
 
+use itertools::Itertools;
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
+use std::borrow::Borrow;
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::sync::Arc;
 
 /// Compare the two expr lists are equal no matter the order.
 /// For example two InListExpr can be considered to be equals no matter the 
order:
@@ -135,109 +130,6 @@ pub fn normalize_out_expr_with_columns_map(
         .unwrap_or(expr)
 }
 
-pub fn normalize_expr_with_equivalence_properties(
-    expr: Arc<dyn PhysicalExpr>,
-    eq_properties: &[EquivalentClass],
-) -> Arc<dyn PhysicalExpr> {
-    expr.clone()
-        .transform(&|expr| {
-            let normalized_form =
-                expr.as_any().downcast_ref::<Column>().and_then(|column| {
-                    for class in eq_properties {
-                        if class.contains(column) {
-                            return Some(Arc::new(class.head().clone()) as _);
-                        }
-                    }
-                    None
-                });
-            Ok(if let Some(normalized_form) = normalized_form {
-                Transformed::Yes(normalized_form)
-            } else {
-                Transformed::No(expr)
-            })
-        })
-        .unwrap_or(expr)
-}
-
-/// This function normalizes `sort_expr` according to `eq_properties`. If the
-/// given sort expression doesn't belong to equivalence set `eq_properties`,
-/// it returns `sort_expr` as is.
-fn normalize_sort_expr_with_equivalence_properties(
-    mut sort_expr: PhysicalSortExpr,
-    eq_properties: &[EquivalentClass],
-) -> PhysicalSortExpr {
-    sort_expr.expr =
-        normalize_expr_with_equivalence_properties(sort_expr.expr, 
eq_properties);
-    sort_expr
-}
-
-/// This function applies the 
[`normalize_sort_expr_with_equivalence_properties`]
-/// function for all sort expressions in `sort_exprs` and returns a vector of
-/// normalized sort expressions.
-pub fn normalize_sort_exprs_with_equivalence_properties(
-    sort_exprs: LexOrderingRef,
-    eq_properties: &EquivalenceProperties,
-) -> LexOrdering {
-    sort_exprs
-        .iter()
-        .map(|expr| {
-            normalize_sort_expr_with_equivalence_properties(
-                expr.clone(),
-                eq_properties.classes(),
-            )
-        })
-        .collect()
-}
-
-/// This function normalizes `sort_requirement` according to `eq_properties`.
-/// If the given sort requirement doesn't belong to equivalence set
-/// `eq_properties`, it returns `sort_requirement` as is.
-fn normalize_sort_requirement_with_equivalence_properties(
-    mut sort_requirement: PhysicalSortRequirement,
-    eq_properties: &[EquivalentClass],
-) -> PhysicalSortRequirement {
-    sort_requirement.expr =
-        normalize_expr_with_equivalence_properties(sort_requirement.expr, 
eq_properties);
-    sort_requirement
-}
-
-/// This function searches for the slice `section` inside the slice `given`.
-/// It returns each range where `section` is compatible with the corresponding
-/// slice in `given`.
-fn get_compatible_ranges(
-    given: &[PhysicalSortRequirement],
-    section: &[PhysicalSortRequirement],
-) -> Vec<Range<usize>> {
-    let n_section = section.len();
-    let n_end = if given.len() >= n_section {
-        given.len() - n_section + 1
-    } else {
-        0
-    };
-    (0..n_end)
-        .filter_map(|idx| {
-            let end = idx + n_section;
-            given[idx..end]
-                .iter()
-                .zip(section)
-                .all(|(req, given)| given.compatible(req))
-                .then_some(Range { start: idx, end })
-        })
-        .collect()
-}
-
-/// This function constructs a duplicate-free vector by filtering out duplicate
-/// entries inside the given vector `input`.
-fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
-    let mut output = vec![];
-    for item in input {
-        if !output.contains(&item) {
-            output.push(item);
-        }
-    }
-    output
-}
-
 /// Transform `sort_exprs` vector, to standardized version using 
`eq_properties` and `ordering_eq_properties`
 /// Assume `eq_properties` states that `Column a` and `Column b` are aliases.
 /// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]` 
and `vec![a ASC, c ASC]` are
@@ -246,10 +138,10 @@ fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
 /// This function converts `sort_exprs` `vec![b ASC, c ASC]` to first `vec![a 
ASC, c ASC]` after considering `eq_properties`
 /// Then converts `vec![a ASC, c ASC]` to `vec![d ASC]` after considering 
`ordering_eq_properties`.
 /// Standardized version `vec![d ASC]` is used in subsequent operations.
-pub fn normalize_sort_exprs(
+fn normalize_sort_exprs(
     sort_exprs: &[PhysicalSortExpr],
-    eq_properties: &[EquivalentClass],
-    ordering_eq_properties: &[OrderingEquivalentClass],
+    eq_properties: &EquivalenceProperties,
+    ordering_eq_properties: &OrderingEquivalenceProperties,
 ) -> Vec<PhysicalSortExpr> {
     let sort_requirements = 
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
     let normalized_exprs = normalize_sort_requirements(
@@ -257,35 +149,7 @@ pub fn normalize_sort_exprs(
         eq_properties,
         ordering_eq_properties,
     );
-    let normalized_exprs = 
PhysicalSortRequirement::to_sort_exprs(normalized_exprs);
-    collapse_vec(normalized_exprs)
-}
-/// This function normalizes `oeq_classes` expressions according to 
`eq_properties`.
-/// More explicitly, it makes sure that expressions in `oeq_classes` are head 
entries
-/// in `eq_properties`, replacing any non-head entries with head entries if 
necessary.
-pub fn normalize_ordering_equivalence_classes(
-    oeq_classes: &[OrderingEquivalentClass],
-    eq_properties: &EquivalenceProperties,
-) -> Vec<OrderingEquivalentClass> {
-    oeq_classes
-        .iter()
-        .map(|class| {
-            let head = normalize_sort_exprs_with_equivalence_properties(
-                class.head(),
-                eq_properties,
-            );
-
-            let others = class
-                .others()
-                .iter()
-                .map(|other| {
-                    normalize_sort_exprs_with_equivalence_properties(other, 
eq_properties)
-                })
-                .collect();
-
-            EquivalentClass::new(head, others)
-        })
-        .collect()
+    PhysicalSortRequirement::to_sort_exprs(normalized_exprs)
 }
 
 /// Transform `sort_reqs` vector, to standardized version using 
`eq_properties` and `ordering_eq_properties`
@@ -296,53 +160,13 @@ pub fn normalize_ordering_equivalence_classes(
 /// This function converts `sort_exprs` `vec![b Some(ASC), c None]` to first 
`vec![a Some(ASC), c None]` after considering `eq_properties`
 /// Then converts `vec![a Some(ASC), c None]` to `vec![d Some(ASC)]` after 
considering `ordering_eq_properties`.
 /// Standardized version `vec![d Some(ASC)]` is used in subsequent operations.
-pub fn normalize_sort_requirements(
+fn normalize_sort_requirements(
     sort_reqs: &[PhysicalSortRequirement],
-    eq_properties: &[EquivalentClass],
-    ordering_eq_properties: &[OrderingEquivalentClass],
+    eq_properties: &EquivalenceProperties,
+    ordering_eq_properties: &OrderingEquivalenceProperties,
 ) -> Vec<PhysicalSortRequirement> {
-    let mut normalized_exprs = sort_reqs
-        .iter()
-        .map(|sort_req| {
-            normalize_sort_requirement_with_equivalence_properties(
-                sort_req.clone(),
-                eq_properties,
-            )
-        })
-        .collect::<Vec<_>>();
-    for ordering_eq_class in ordering_eq_properties {
-        for item in ordering_eq_class.others() {
-            let item = item
-                .clone()
-                .into_iter()
-                .map(|elem| elem.into())
-                .collect::<Vec<_>>();
-            let ranges = get_compatible_ranges(&normalized_exprs, &item);
-            let mut offset: i64 = 0;
-            for Range { start, end } in ranges {
-                let mut head = ordering_eq_class
-                    .head()
-                    .clone()
-                    .into_iter()
-                    .map(|elem| elem.into())
-                    .collect::<Vec<PhysicalSortRequirement>>();
-                let updated_start = (start as i64 + offset) as usize;
-                let updated_end = (end as i64 + offset) as usize;
-                let range = end - start;
-                offset += head.len() as i64 - range as i64;
-                let all_none = normalized_exprs[updated_start..updated_end]
-                    .iter()
-                    .all(|req| req.options.is_none());
-                if all_none {
-                    for req in head.iter_mut() {
-                        req.options = None;
-                    }
-                }
-                normalized_exprs.splice(updated_start..updated_end, head);
-            }
-        }
-    }
-    collapse_vec(normalized_exprs)
+    let normalized_sort_reqs = 
eq_properties.normalize_sort_requirements(sort_reqs);
+    ordering_eq_properties.normalize_sort_requirements(&normalized_sort_reqs)
 }
 
 /// Checks whether given ordering requirements are satisfied by provided 
[PhysicalSortExpr]s.
@@ -379,13 +203,11 @@ pub fn ordering_satisfy_concrete<
     ordering_equal_properties: F2,
 ) -> bool {
     let oeq_properties = ordering_equal_properties();
-    let ordering_eq_classes = oeq_properties.classes();
     let eq_properties = equal_properties();
-    let eq_classes = eq_properties.classes();
     let required_normalized =
-        normalize_sort_exprs(required, eq_classes, ordering_eq_classes);
+        normalize_sort_exprs(required, &eq_properties, &oeq_properties);
     let provided_normalized =
-        normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
+        normalize_sort_exprs(provided, &eq_properties, &oeq_properties);
     if required_normalized.len() > provided_normalized.len() {
         return false;
     }
@@ -430,13 +252,11 @@ pub fn ordering_satisfy_requirement_concrete<
     ordering_equal_properties: F2,
 ) -> bool {
     let oeq_properties = ordering_equal_properties();
-    let ordering_eq_classes = oeq_properties.classes();
     let eq_properties = equal_properties();
-    let eq_classes = eq_properties.classes();
     let required_normalized =
-        normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+        normalize_sort_requirements(required, &eq_properties, &oeq_properties);
     let provided_normalized =
-        normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
+        normalize_sort_exprs(provided, &eq_properties, &oeq_properties);
     if required_normalized.len() > provided_normalized.len() {
         return false;
     }
@@ -481,14 +301,12 @@ fn requirements_compatible_concrete<
     equal_properties: F2,
 ) -> bool {
     let oeq_properties = ordering_equal_properties();
-    let ordering_eq_classes = oeq_properties.classes();
     let eq_properties = equal_properties();
-    let eq_classes = eq_properties.classes();
 
     let required_normalized =
-        normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+        normalize_sort_requirements(required, &eq_properties, &oeq_properties);
     let provided_normalized =
-        normalize_sort_requirements(provided, eq_classes, ordering_eq_classes);
+        normalize_sort_requirements(provided, &eq_properties, &oeq_properties);
     if required_normalized.len() > provided_normalized.len() {
         return false;
     }
@@ -542,26 +360,15 @@ pub fn convert_to_expr<T: Borrow<PhysicalSortExpr>>(
 
 /// This function finds the indices of `targets` within `items`, taking into
 /// account equivalences according to `equal_properties`.
-pub fn get_indices_of_matching_exprs<
-    T: Borrow<Arc<dyn PhysicalExpr>>,
-    F: FnOnce() -> EquivalenceProperties,
->(
-    targets: impl IntoIterator<Item = T>,
+pub fn get_indices_of_matching_exprs<F: FnOnce() -> EquivalenceProperties>(
+    targets: &[Arc<dyn PhysicalExpr>],
     items: &[Arc<dyn PhysicalExpr>],
     equal_properties: F,
 ) -> Vec<usize> {
-    if let eq_classes @ [_, ..] = equal_properties().classes() {
-        let normalized_targets = targets.into_iter().map(|e| {
-            normalize_expr_with_equivalence_properties(e.borrow().clone(), 
eq_classes)
-        });
-        let normalized_items = items
-            .iter()
-            .map(|e| normalize_expr_with_equivalence_properties(e.clone(), 
eq_classes))
-            .collect::<Vec<_>>();
-        get_indices_of_exprs_strict(normalized_targets, &normalized_items)
-    } else {
-        get_indices_of_exprs_strict(targets, items)
-    }
+    let eq_properties = equal_properties();
+    let normalized_items = eq_properties.normalize_exprs(items);
+    let normalized_targets = eq_properties.normalize_exprs(targets);
+    get_indices_of_exprs_strict(normalized_targets, &normalized_items)
 }
 
 /// This function finds the indices of `targets` within `items` using strict
@@ -870,13 +677,13 @@ pub fn get_indices_of_matching_sort_exprs_with_order_eq(
 
     let normalized_required = normalize_sort_requirements(
         &sort_requirement_on_requirements,
-        eq_properties.classes(),
-        &[],
+        eq_properties,
+        &OrderingEquivalenceProperties::new(order_eq_properties.schema()),
     );
     let normalized_provided = normalize_sort_requirements(
         &PhysicalSortRequirement::from_sort_exprs(provided_sorts.iter()),
-        eq_properties.classes(),
-        &[],
+        eq_properties,
+        &OrderingEquivalenceProperties::new(order_eq_properties.schema()),
     );
 
     let provided_sorts = normalized_provided
@@ -902,9 +709,9 @@ pub fn get_indices_of_matching_sort_exprs_with_order_eq(
     }
 
     // We did not find all the expressions, consult ordering equivalence 
properties:
-    for class in order_eq_properties.classes() {
-        let head = class.head();
-        for ordering in class.others().iter().chain(std::iter::once(head)) {
+    if let Some(oeq_class) = order_eq_properties.oeq_class() {
+        let head = oeq_class.head();
+        for ordering in oeq_class.others().iter().chain(std::iter::once(head)) 
{
             let order_eq_class_exprs = convert_to_expr(ordering);
             if let Some(indices_of_equality) = 
get_lexicographical_match_indices(
                 &normalized_required_expr,
@@ -981,6 +788,18 @@ pub fn find_orderings_of_exprs(
     Ok(orderings)
 }
 
+/// Merge left and right sort expressions, checking for duplicates.
+pub fn merge_vectors(
+    left: &[PhysicalSortExpr],
+    right: &[PhysicalSortExpr],
+) -> Vec<PhysicalSortExpr> {
+    left.iter()
+        .cloned()
+        .chain(right.iter().cloned())
+        .unique()
+        .collect()
+}
+
 #[cfg(test)]
 mod tests {
     use std::fmt::{Display, Formatter};
@@ -990,7 +809,7 @@ mod tests {
     use super::*;
     use crate::equivalence::OrderingEquivalenceProperties;
     use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
-    use crate::PhysicalSortExpr;
+    use crate::{OrderingEquivalentClass, PhysicalSortExpr};
 
     use arrow::compute::SortOptions;
     use arrow_array::Int32Array;
@@ -1046,7 +865,8 @@ mod tests {
         let c = Field::new("c", DataType::Int32, true);
         let d = Field::new("d", DataType::Int32, true);
         let e = Field::new("e", DataType::Int32, true);
-        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+        let f = Field::new("f", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
 
         Ok(schema)
     }
@@ -1057,13 +877,15 @@ mod tests {
         OrderingEquivalenceProperties,
     )> {
         // Assume schema satisfies ordering a ASC NULLS LAST
-        // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, b 
ASC NULLS LAST
+        // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, f 
ASC NULLS LAST, g ASC NULLS LAST
         // Assume that column a and c are aliases.
         let col_a = &Column::new("a", 0);
         let col_b = &Column::new("b", 1);
         let col_c = &Column::new("c", 2);
         let col_d = &Column::new("d", 3);
         let col_e = &Column::new("e", 4);
+        let col_f = &Column::new("f", 5);
+        let col_g = &Column::new("g", 6);
         let option1 = SortOptions {
             descending: false,
             nulls_first: false,
@@ -1104,7 +926,11 @@ mod tests {
                     options: option2,
                 },
                 PhysicalSortExpr {
-                    expr: Arc::new(col_b.clone()),
+                    expr: Arc::new(col_f.clone()),
+                    options: option1,
+                },
+                PhysicalSortExpr {
+                    expr: Arc::new(col_g.clone()),
                     options: option1,
                 },
             ],
@@ -1312,6 +1138,8 @@ mod tests {
         let col_c = &Column::new("c", 2);
         let col_d = &Column::new("d", 3);
         let col_e = &Column::new("e", 4);
+        let col_f = &Column::new("f", 5);
+        let col_g = &Column::new("g", 6);
         let option1 = SortOptions {
             descending: false,
             nulls_first: false,
@@ -1342,10 +1170,16 @@ mod tests {
             (vec![(col_c, option1)], true),
             (vec![(col_c, option2)], false),
             // Test whether ordering equivalence works as expected
-            (vec![(col_d, option1)], false),
+            (vec![(col_d, option1)], true),
             (vec![(col_d, option1), (col_b, option1)], true),
             (vec![(col_d, option2), (col_b, option1)], false),
-            (vec![(col_e, option2), (col_b, option1)], true),
+            (
+                vec![(col_e, option2), (col_f, option1), (col_g, option1)],
+                true,
+            ),
+            (vec![(col_e, option2), (col_f, option1)], true),
+            (vec![(col_e, option1), (col_f, option1)], false),
+            (vec![(col_e, option2), (col_b, option1)], false),
             (vec![(col_e, option1), (col_b, option1)], false),
             (
                 vec![
@@ -1356,6 +1190,15 @@ mod tests {
                 ],
                 true,
             ),
+            (
+                vec![
+                    (col_d, option1),
+                    (col_b, option1),
+                    (col_e, option2),
+                    (col_f, option1),
+                ],
+                true,
+            ),
             (
                 vec![
                     (col_d, option1),
@@ -1372,6 +1215,15 @@ mod tests {
                     (col_d, option2),
                     (col_b, option1),
                 ],
+                true,
+            ),
+            (
+                vec![
+                    (col_d, option1),
+                    (col_b, option1),
+                    (col_e, option1),
+                    (col_f, option1),
+                ],
                 false,
             ),
             (
@@ -1383,7 +1235,9 @@ mod tests {
                 ],
                 false,
             ),
+            (vec![(col_d, option1), (col_e, option2)], true),
         ];
+
         for (cols, expected) in requirements {
             let err_msg = format!("Error in test case:{cols:?}");
             let required = cols
@@ -1427,6 +1281,7 @@ mod tests {
         let col_c = &Column::new("c", 2);
         let col_d = &Column::new("d", 3);
         let col_e = &Column::new("e", 4);
+        let col_f = &Column::new("f", 5);
         let option1 = SortOptions {
             descending: false,
             nulls_first: false,
@@ -1438,36 +1293,46 @@ mod tests {
         // First element in the tuple stores vector of requirement, second 
element is the expected return value for ordering_satisfy function
         let requirements = vec![
             (vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
-            (vec![(col_a, None)], vec![(col_a, None)]),
+            (vec![(col_a, Some(option2))], vec![(col_a, Some(option2))]),
+            (vec![(col_a, None)], vec![(col_a, Some(option1))]),
             // Test whether equivalence works as expected
             (vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
-            (vec![(col_c, None)], vec![(col_a, None)]),
+            (vec![(col_c, None)], vec![(col_a, Some(option1))]),
             // Test whether ordering equivalence works as expected
             (
                 vec![(col_d, Some(option1)), (col_b, Some(option1))],
                 vec![(col_a, Some(option1))],
             ),
-            (vec![(col_d, None), (col_b, None)], vec![(col_a, None)]),
             (
-                vec![(col_e, Some(option2)), (col_b, Some(option1))],
+                vec![(col_d, None), (col_b, None)],
+                vec![(col_a, Some(option1))],
+            ),
+            (
+                vec![(col_e, Some(option2)), (col_f, Some(option1))],
                 vec![(col_a, Some(option1))],
             ),
             // We should be able to normalize in compatible requirements also 
(not exactly equal)
             (
-                vec![(col_e, Some(option2)), (col_b, None)],
+                vec![(col_e, Some(option2)), (col_f, None)],
+                vec![(col_a, Some(option1))],
+            ),
+            (
+                vec![(col_e, None), (col_f, None)],
                 vec![(col_a, Some(option1))],
             ),
-            (vec![(col_e, None), (col_b, None)], vec![(col_a, None)]),
         ];
+
         let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
-        let eq_classes = eq_properties.classes();
-        let ordering_eq_classes = ordering_eq_properties.classes();
         for (reqs, expected_normalized) in requirements.into_iter() {
             let req = convert_to_requirement(&reqs);
             let expected_normalized = 
convert_to_requirement(&expected_normalized);
 
             assert_eq!(
-                normalize_sort_requirements(&req, eq_classes, 
ordering_eq_classes),
+                normalize_sort_requirements(
+                    &req,
+                    &eq_properties,
+                    &ordering_eq_properties,
+                ),
                 expected_normalized
             );
         }
@@ -1536,10 +1401,7 @@ mod tests {
         ];
         for (expr, expected_eq) in expressions {
             assert!(
-                expected_eq.eq(&normalize_expr_with_equivalence_properties(
-                    expr.clone(),
-                    eq_properties.classes()
-                )),
+                expected_eq.eq(&eq_properties.normalize_expr(expr.clone())),
                 "error in test: expr: {expr:?}"
             );
         }
@@ -1583,10 +1445,7 @@ mod tests {
                 sort_options,
             );
             assert!(
-                
expected.eq(&normalize_sort_requirement_with_equivalence_properties(
-                    arg.clone(),
-                    eq_properties.classes()
-                )),
+                
expected.eq(&eq_properties.normalize_sort_requirement(arg.clone())),
                 "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
             );
         }
@@ -1685,55 +1544,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_get_compatible_ranges() -> Result<()> {
-        let col_a = &Column::new("a", 0);
-        let col_b = &Column::new("b", 1);
-        let option1 = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-        let test_data = vec![
-            (
-                vec![(col_a, Some(option1)), (col_b, Some(option1))],
-                vec![(col_a, Some(option1))],
-                vec![(0, 1)],
-            ),
-            (
-                vec![(col_a, None), (col_b, Some(option1))],
-                vec![(col_a, Some(option1))],
-                vec![(0, 1)],
-            ),
-            (
-                vec![
-                    (col_a, None),
-                    (col_b, Some(option1)),
-                    (col_a, Some(option1)),
-                ],
-                vec![(col_a, Some(option1))],
-                vec![(0, 1), (2, 3)],
-            ),
-        ];
-        for (searched, to_search, expected) in test_data {
-            let searched = convert_to_requirement(&searched);
-            let to_search = convert_to_requirement(&to_search);
-            let expected = expected
-                .into_iter()
-                .map(|(start, end)| Range { start, end })
-                .collect::<Vec<_>>();
-            assert_eq!(get_compatible_ranges(&searched, &to_search), expected);
-        }
-        Ok(())
-    }
-
-    #[test]
-    fn test_collapse_vec() -> Result<()> {
-        assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]);
-        assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]);
-        assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
-        Ok(())
-    }
-
     #[test]
     fn test_collect_columns() -> Result<()> {
         let expr1 = Arc::new(Column::new("col1", 2)) as _;
@@ -1940,22 +1750,20 @@ mod tests {
             Field::new("c", DataType::Int32, true),
         ]);
         let mut equal_properties = 
EquivalenceProperties::new(Arc::new(schema.clone()));
-        let mut ordering_equal_properties =
-            OrderingEquivalenceProperties::new(Arc::new(schema.clone()));
         let mut expected_oeq = 
OrderingEquivalenceProperties::new(Arc::new(schema));
 
         equal_properties
             .add_equal_conditions((&Column::new("a", 0), &Column::new("c", 
2)));
-        ordering_equal_properties.add_equal_conditions((
-            &vec![PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
-                options: sort_options,
-            }],
-            &vec![PhysicalSortExpr {
-                expr: Arc::new(Column::new("c", 2)),
-                options: sort_options,
-            }],
-        ));
+        let head = vec![PhysicalSortExpr {
+            expr: Arc::new(Column::new("b", 1)),
+            options: sort_options,
+        }];
+        let others = vec![vec![PhysicalSortExpr {
+            expr: Arc::new(Column::new("c", 2)),
+            options: sort_options,
+        }]];
+        let oeq_class = OrderingEquivalentClass::new(head, others);
+
         expected_oeq.add_equal_conditions((
             &vec![PhysicalSortExpr {
                 expr: Arc::new(Column::new("b", 1)),
@@ -1967,13 +1775,13 @@ mod tests {
             }],
         ));
 
-        assert!(!normalize_ordering_equivalence_classes(
-            ordering_equal_properties.classes(),
-            &equal_properties,
-        )
-        .iter()
-        .zip(expected_oeq.classes())
-        .any(|(a, b)| a.head().ne(b.head()) || a.others().ne(b.others())));
+        let normalized_oeq_class =
+            oeq_class.normalize_with_equivalence_properties(&equal_properties);
+        let expected = expected_oeq.oeq_class().unwrap();
+        assert!(
+            normalized_oeq_class.head().eq(expected.head())
+                && normalized_oeq_class.others().eq(expected.others())
+        );
 
         Ok(())
     }
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 15208fd082..4a8b189144 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -41,12 +41,13 @@ use datafusion_common::{plan_err, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 use datafusion_expr::Operator;
 use datafusion_physical_expr::expressions::BinaryExpr;
-use datafusion_physical_expr::intervals::utils::check_support;
 use datafusion_physical_expr::{
     analyze, split_conjunction, AnalysisContext, ExprBoundaries,
     OrderingEquivalenceProperties, PhysicalExpr,
 };
 
+use datafusion_physical_expr::intervals::utils::check_support;
+use datafusion_physical_expr::utils::collect_columns;
 use futures::stream::{Stream, StreamExt};
 use log::trace;
 
@@ -153,7 +154,19 @@ impl ExecutionPlan for FilterExec {
     }
 
     fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
-        self.input.ordering_equivalence_properties()
+        let stats = self.statistics();
+        // Add the columns that have only one value (singleton) after 
filtering to constants.
+        if let Some(col_stats) = stats.column_statistics {
+            let constants = collect_columns(self.predicate())
+                .into_iter()
+                .filter(|column| col_stats[column.index()].is_singleton())
+                .map(|column| Arc::new(column) as Arc<dyn PhysicalExpr>)
+                .collect::<Vec<_>>();
+            let filter_oeq = self.input.ordering_equivalence_properties();
+            filter_oeq.with_constants(constants)
+        } else {
+            self.input.ordering_equivalence_properties()
+        }
     }
 
     fn with_new_children(
@@ -197,7 +210,14 @@ impl ExecutionPlan for FilterExec {
         let input_stats = self.input.statistics();
         let input_column_stats = match input_stats.column_statistics {
             Some(stats) => stats,
-            None => return Statistics::default(),
+            None => self
+                .schema()
+                .fields
+                .iter()
+                .map(|field| {
+                    
ColumnStatistics::new_with_unbounded_column(field.data_type())
+                })
+                .collect::<Vec<_>>(),
         };
 
         let starter_ctx =
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index e33de001df..67f60e57d7 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -44,17 +44,15 @@ use datafusion_common::{
     exec_err, plan_err, DataFusionError, JoinType, Result, ScalarValue, 
SharedResult,
 };
 use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::utils::{
-    normalize_ordering_equivalence_classes, normalize_sort_exprs,
-};
 use datafusion_physical_expr::{
-    EquivalentClass, LexOrdering, LexOrderingRef, 
OrderingEquivalenceProperties,
-    OrderingEquivalentClass, PhysicalExpr, PhysicalSortExpr,
+    add_offset_to_lex_ordering, EquivalentClass, LexOrdering, LexOrderingRef,
+    OrderingEquivalenceProperties, OrderingEquivalentClass, PhysicalExpr,
+    PhysicalSortExpr,
 };
 
+use datafusion_physical_expr::utils::merge_vectors;
 use futures::future::{BoxFuture, Shared};
 use futures::{ready, FutureExt};
-use itertools::Itertools;
 use parking_lot::Mutex;
 
 /// The on clause of the join, as vector of (left, right) columns.
@@ -324,66 +322,24 @@ pub fn cross_join_equivalence_properties(
 ///
 /// This way; once we normalize an expression according to equivalence 
properties,
 /// it can thereafter safely be used for ordering equivalence normalization.
-fn get_updated_right_ordering_equivalence_properties(
+fn get_updated_right_ordering_equivalent_class(
     join_type: &JoinType,
-    right_oeq_classes: &[OrderingEquivalentClass],
+    right_oeq_class: &OrderingEquivalentClass,
     left_columns_len: usize,
     join_eq_properties: &EquivalenceProperties,
-) -> Result<Vec<OrderingEquivalentClass>> {
-    let updated_oeqs = match join_type {
+) -> Result<OrderingEquivalentClass> {
+    match join_type {
         // In these modes, indices of the right schema should be offset by
         // the left table size.
         JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => 
{
-            add_offset_to_ordering_equivalence_classes(
-                right_oeq_classes,
-                left_columns_len,
-            )?
+            let right_oeq_class = 
right_oeq_class.add_offset(left_columns_len)?;
+            return Ok(
+                
right_oeq_class.normalize_with_equivalence_properties(join_eq_properties)
+            );
         }
-        _ => right_oeq_classes.to_vec(),
+        _ => {}
     };
-
-    Ok(normalize_ordering_equivalence_classes(
-        &updated_oeqs,
-        join_eq_properties,
-    ))
-}
-
-/// Merge left and right sort expressions, checking for duplicates.
-fn merge_vectors(
-    left: &[PhysicalSortExpr],
-    right: &[PhysicalSortExpr],
-) -> Vec<PhysicalSortExpr> {
-    left.iter()
-        .cloned()
-        .chain(right.iter().cloned())
-        .unique()
-        .collect()
-}
-
-/// Prefix with existing ordering.
-fn prefix_ordering_equivalence_with_existing_ordering(
-    existing_ordering: &[PhysicalSortExpr],
-    oeq_classes: &[OrderingEquivalentClass],
-    eq_classes: &[EquivalentClass],
-) -> Vec<OrderingEquivalentClass> {
-    let existing_ordering = normalize_sort_exprs(existing_ordering, 
eq_classes, &[]);
-    oeq_classes
-        .iter()
-        .map(|oeq_class| {
-            let normalized_head = normalize_sort_exprs(oeq_class.head(), 
eq_classes, &[]);
-            let updated_head = merge_vectors(&existing_ordering, 
&normalized_head);
-            let updated_others = oeq_class
-                .others()
-                .iter()
-                .map(|ordering| {
-                    let normalized_ordering =
-                        normalize_sort_exprs(ordering, eq_classes, &[]);
-                    merge_vectors(&existing_ordering, &normalized_ordering)
-                })
-                .collect();
-            OrderingEquivalentClass::new(updated_head, updated_others)
-        })
-        .collect()
+    
Ok(right_oeq_class.normalize_with_equivalence_properties(join_eq_properties))
 }
 
 /// Calculate ordering equivalence properties for the given join operation.
@@ -411,20 +367,29 @@ pub fn combine_join_ordering_equivalence_properties(
             ))
         }
         (true, false) => {
-            
new_properties.extend(left_oeq_properties.classes().iter().cloned());
+            new_properties.extend(left_oeq_properties.oeq_class().cloned());
             // In this special case, right side ordering can be prefixed with 
left side ordering.
-            if probe_side == Some(JoinSide::Left)
-                && right.output_ordering().is_some()
-                && *join_type == JoinType::Inner
-            {
-                let right_oeq_classes =
-                    get_updated_right_ordering_equivalence_properties(
-                        join_type,
-                        right_oeq_properties.classes(),
-                        left_columns_len,
-                        &join_eq_properties,
-                    )?;
+            if let (
+                Some(JoinSide::Left),
+                // right side have an ordering
+                Some(_),
+                JoinType::Inner,
+                Some(oeq_class),
+            ) = (
+                probe_side,
+                right.output_ordering(),
+                join_type,
+                right_oeq_properties.oeq_class(),
+            ) {
                 let left_output_ordering = 
left.output_ordering().unwrap_or(&[]);
+
+                let updated_right_oeq = 
get_updated_right_ordering_equivalent_class(
+                    join_type,
+                    oeq_class,
+                    left_columns_len,
+                    &join_eq_properties,
+                )?;
+
                 // Right side ordering equivalence properties should be 
prepended with
                 // those of the left side while constructing output ordering 
equivalence
                 // properties since stream side is the left side.
@@ -433,32 +398,44 @@ pub fn combine_join_ordering_equivalence_properties(
                 // ordering of the left table is `a ASC`, then the ordering 
equivalence `b ASC`
                 // for the right table should be converted to `a ASC, b ASC` 
before it is added
                 // to the ordering equivalences of the join.
-                let updated_right_oeq_classes =
-                    prefix_ordering_equivalence_with_existing_ordering(
+                let updated_right_oeq_class = updated_right_oeq
+                    .prefix_ordering_equivalent_class_with_existing_ordering(
                         left_output_ordering,
-                        &right_oeq_classes,
-                        join_eq_properties.classes(),
+                        &join_eq_properties,
                     );
-                new_properties.extend(updated_right_oeq_classes);
+                new_properties.extend(Some(updated_right_oeq_class));
             }
         }
         (false, true) => {
-            let right_oeq_classes = 
get_updated_right_ordering_equivalence_properties(
-                join_type,
-                right_oeq_properties.classes(),
-                left_columns_len,
-                &join_eq_properties,
-            )?;
-            new_properties.extend(right_oeq_classes);
+            let updated_right_oeq = right_oeq_properties
+                .oeq_class()
+                .map(|right_oeq_class| {
+                    get_updated_right_ordering_equivalent_class(
+                        join_type,
+                        right_oeq_class,
+                        left_columns_len,
+                        &join_eq_properties,
+                    )
+                })
+                .transpose()?;
+            new_properties.extend(updated_right_oeq);
             // In this special case, left side ordering can be prefixed with 
right side ordering.
-            if probe_side == Some(JoinSide::Right)
-                && left.output_ordering().is_some()
-                && *join_type == JoinType::Inner
-            {
-                let left_oeq_classes = left_oeq_properties.classes();
+            if let (
+                Some(JoinSide::Right),
+                // left side have an ordering
+                Some(_),
+                JoinType::Inner,
+                Some(left_oeq_class),
+            ) = (
+                probe_side,
+                left.output_ordering(),
+                join_type,
+                left_oeq_properties.oeq_class(),
+            ) {
                 let right_output_ordering = 
right.output_ordering().unwrap_or(&[]);
                 let right_output_ordering =
                     add_offset_to_lex_ordering(right_output_ordering, 
left_columns_len)?;
+
                 // Left side ordering equivalence properties should be 
prepended with
                 // those of the right side while constructing output ordering 
equivalence
                 // properties since stream side is the right side.
@@ -467,13 +444,12 @@ pub fn combine_join_ordering_equivalence_properties(
                 // ordering of the left table is `a ASC`, then the ordering 
equivalence `b ASC`
                 // for the right table should be converted to `a ASC, b ASC` 
before it is added
                 // to the ordering equivalences of the join.
-                let updated_left_oeq_classes =
-                    prefix_ordering_equivalence_with_existing_ordering(
+                let updated_left_oeq_class = left_oeq_class
+                    .prefix_ordering_equivalent_class_with_existing_ordering(
                         &right_output_ordering,
-                        left_oeq_classes,
-                        join_eq_properties.classes(),
+                        &join_eq_properties,
                     );
-                new_properties.extend(updated_left_oeq_classes);
+                new_properties.extend(Some(updated_left_oeq_class));
             }
         }
         (false, false) => {}
@@ -481,64 +457,6 @@ pub fn combine_join_ordering_equivalence_properties(
     Ok(new_properties)
 }
 
-/// Adds the `offset` value to `Column` indices inside `expr`. This function is
-/// generally used during the update of the right table schema in join 
operations.
-pub(crate) fn add_offset_to_expr(
-    expr: Arc<dyn PhysicalExpr>,
-    offset: usize,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
-        Some(col) => Ok(Transformed::Yes(Arc::new(Column::new(
-            col.name(),
-            offset + col.index(),
-        )))),
-        None => Ok(Transformed::No(e)),
-    })
-}
-
-/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`.
-pub(crate) fn add_offset_to_sort_expr(
-    sort_expr: &PhysicalSortExpr,
-    offset: usize,
-) -> Result<PhysicalSortExpr> {
-    Ok(PhysicalSortExpr {
-        expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?,
-        options: sort_expr.options,
-    })
-}
-
-/// Adds the `offset` value to `Column` indices for each `sort_expr.expr`
-/// inside `sort_exprs`.
-pub(crate) fn add_offset_to_lex_ordering(
-    sort_exprs: LexOrderingRef,
-    offset: usize,
-) -> Result<LexOrdering> {
-    sort_exprs
-        .iter()
-        .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset))
-        .collect()
-}
-
-/// Adds the `offset` value to `Column` indices for all expressions inside the
-/// given `OrderingEquivalentClass`es.
-pub(crate) fn add_offset_to_ordering_equivalence_classes(
-    oeq_classes: &[OrderingEquivalentClass],
-    offset: usize,
-) -> Result<Vec<OrderingEquivalentClass>> {
-    oeq_classes
-        .iter()
-        .map(|prop| {
-            let new_head = add_offset_to_lex_ordering(prop.head(), offset)?;
-            let new_others = prop
-                .others()
-                .iter()
-                .map(|ordering| add_offset_to_lex_ordering(ordering, offset))
-                .collect::<Result<Vec<_>>>()?;
-            Ok(OrderingEquivalentClass::new(new_head, new_others))
-        })
-        .collect()
-}
-
 impl Display for JoinSide {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         match self {
@@ -1920,7 +1838,7 @@ mod tests {
         let join_type = JoinType::Inner;
 
         let options = SortOptions::default();
-        let right_oeq_classes = OrderingEquivalentClass::new(
+        let right_oeq_class = OrderingEquivalentClass::new(
             vec![
                 PhysicalSortExpr {
                     expr: Arc::new(Column::new("x", 0)),
@@ -1957,9 +1875,9 @@ mod tests {
         join_eq_properties
             .add_equal_conditions((&Column::new("d", 3), &Column::new("w", 
7)));
 
-        let result = get_updated_right_ordering_equivalence_properties(
+        let result = get_updated_right_ordering_equivalent_class(
             &join_type,
-            &[right_oeq_classes],
+            &right_oeq_class,
             left_columns_len,
             &join_eq_properties,
         )?;
@@ -1987,8 +1905,8 @@ mod tests {
             ]],
         );
 
-        assert_eq!(result[0].head(), expected.head());
-        assert_eq!(result[0].others(), expected.others());
+        assert_eq!(result.head(), expected.head());
+        assert_eq!(result.others(), expected.others());
 
         Ok(())
     }
diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index d36d93d29e..b29c8e9c7b 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -296,9 +296,9 @@ mod tests {
         assert_eq!(mem_exec.output_ordering().unwrap(), expected_output_order);
         let order_eq = mem_exec.ordering_equivalence_properties();
         assert!(order_eq
-            .classes()
-            .iter()
-            .any(|class| class.contains(&expected_order_eq)));
+            .oeq_class()
+            .map(|class| class.contains(&expected_order_eq))
+            .unwrap_or(false));
         Ok(())
     }
 }
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index f1ec0a68a6..4fc48e971c 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -40,11 +40,11 @@ use datafusion_common::Result;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::expressions::{Literal, UnKnownColumn};
 use datafusion_physical_expr::{
-    find_orderings_of_exprs, normalize_out_expr_with_columns_map,
-    project_equivalence_properties, project_ordering_equivalence_properties,
-    OrderingEquivalenceProperties,
+    normalize_out_expr_with_columns_map, project_equivalence_properties,
+    project_ordering_equivalence_properties, OrderingEquivalenceProperties,
 };
 
+use datafusion_physical_expr::utils::find_orderings_of_exprs;
 use futures::stream::{Stream, StreamExt};
 use log::trace;
 
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index a7ed2bf5c7..b099107358 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -888,6 +888,112 @@ physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, 2 as Int64(2)]
 --CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], 
output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true
 
+# source is ordered by a,b,c
+# when filter result is constant for column a
+# ordering b, c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0
+ORDER BY b, c;
+----
+logical_plan
+Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC 
NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], 
partial_filters=[annotated_data_finite2.a = Int32(0)]
+physical_plan
+SortPreservingMergeExec: [b@2 ASC NULLS LAST,c@3 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC 
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is constant for column a and b
+# ordering c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 and b=0
+ORDER BY c;
+----
+logical_plan
+Sort: annotated_data_finite2.c ASC NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = 
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], 
partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b 
= Int32(0)]
+physical_plan
+SortPreservingMergeExec: [c@3 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0 AND b@2 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC 
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is constant for column a and b
+# ordering b, c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 and b=0
+ORDER BY b, c;
+----
+logical_plan
+Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC 
NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = 
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], 
partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b 
= Int32(0)]
+physical_plan
+SortPreservingMergeExec: [b@2 ASC NULLS LAST,c@3 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0 AND b@2 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC 
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is constant for column a and b
+# ordering a, b, c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 and b=0
+ORDER BY a, b, c;
+----
+logical_plan
+Sort: annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC 
NULLS LAST, annotated_data_finite2.c ASC NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = 
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], 
partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b 
= Int32(0)]
+physical_plan
+SortPreservingMergeExec: [a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS 
LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0 AND b@2 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC 
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is when filter contains or
+# column a, and b may not be constant. Hence final plan
+# should contain SortExec
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 or b=0
+ORDER BY c;
+----
+logical_plan
+Sort: annotated_data_finite2.c ASC NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) OR annotated_data_finite2.b = 
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], 
partial_filters=[annotated_data_finite2.a = Int32(0) OR 
annotated_data_finite2.b = Int32(0)]
+physical_plan
+SortPreservingMergeExec: [c@3 ASC NULLS LAST]
+--SortExec: expr=[c@3 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=8192
+------FilterExec: a@1 = 0 OR b@2 = 0
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC 
NULLS LAST], has_header=true
+
 statement ok
 drop table annotated_data_finite2;
 
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index fe074da1bb..2eccb60aad 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -284,19 +284,20 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS 
t2_sum
 ------------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
-----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
+--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), 
t2_id@1 as t2_id]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
+--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------FilterExec: SUM(t2.t2_int)@1 < 3
+--------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
+----------------CoalesceBatchesExec: target_batch_size=8192
+------------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
+--------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
+----------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 --------CoalesceBatchesExec: target_batch_size=8192
-----------FilterExec: SUM(t2.t2_int)@1 < 3
-------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
---------------CoalesceBatchesExec: target_batch_size=8192
-----------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
-------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
---------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
+------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
 query II rowsort
 SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having 
sum(t2_int) < 3) as t2_sum from t1
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index f8f8f30ade..3d9f7511be 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2342,11 +2342,10 @@ Limit: skip=0, fetch=5
 ----------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
---SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]
-----ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
-------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
---------SortExec: expr=[c9@0 DESC]
-----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
+--ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
+------SortExec: expr=[c9@0 DESC]
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,


Reply via email to