This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b0d24912ac Extend Ordering Equivalence Support (#6956)
b0d24912ac is described below

commit b0d24912acd9d86ca6ce3af26b4d845ec250048d
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Jul 17 15:41:09 2023 +0300

    Extend Ordering Equivalence Support (#6956)
    
    * initial implemantation
    
    * minor changes
    
    * Fix bug, ordering equivalence random head
    
    * minor changes
    
    * Add ordering equivalence for sort merge join
    
    * Add ordering equivalence for sort merge join
    
    * Update comment
    
    * Add ordering equivalence support for hash join
    
    * Code enhancements/comment improvements
    
    * Add projection cast handling
    
    * Fix output ordering for sort merge join
    
    * projection bug fix
    
    * Minor changes
    
    * minor changes
    
    * simplify sort_merge_join
    
    * Update equivalence implementation
    
    * Update cast implementation
    
    * More idiomatic code
    
    * simplifications
    
    * remove unnecessary code
    
    * Comment improvements
    
    * Fix formatting
    
    * Address reviews
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../core/src/physical_plan/coalesce_batches.rs     |  12 +-
 .../core/src/physical_plan/coalesce_partitions.rs  |   9 +-
 datafusion/core/src/physical_plan/filter.rs        |  15 +-
 .../core/src/physical_plan/joins/hash_join.rs      | 101 ++++++----
 .../src/physical_plan/joins/sort_merge_join.rs     | 163 ++++++++++++----
 datafusion/core/src/physical_plan/joins/utils.rs   |  98 ++++++++--
 datafusion/core/src/physical_plan/limit.rs         |  29 +--
 datafusion/core/src/physical_plan/projection.rs    |  22 ++-
 .../core/src/physical_plan/repartition/mod.rs      |  25 ++-
 .../physical_plan/sorts/sort_preserving_merge.rs   |  21 ++-
 .../core/tests/sqllogictests/test_files/joins.slt  | 206 +++++++++++++++++++++
 .../core/tests/sqllogictests/test_files/window.slt |  50 +++++
 datafusion/physical-expr/src/equivalence.rs        | 201 ++++++++++++--------
 datafusion/physical-expr/src/expressions/cast.rs   |   2 +-
 datafusion/physical-expr/src/lib.rs                |   3 +-
 datafusion/physical-expr/src/sort_expr.rs          |   9 +-
 datafusion/physical-expr/src/utils.rs              |  20 ++
 17 files changed, 769 insertions(+), 217 deletions(-)

diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs 
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 9454a36af7..994f75ce4c 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -27,20 +27,22 @@ use crate::physical_plan::{
     DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
     RecordBatchStream, SendableRecordBatchStream,
 };
-use datafusion_common::Result;
 
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
 use datafusion_execution::TaskContext;
-use futures::stream::{Stream, StreamExt};
-use log::trace;
+use datafusion_physical_expr::OrderingEquivalenceProperties;
 
 use super::expressions::PhysicalSortExpr;
 use super::metrics::{BaselineMetrics, MetricsSet};
 use super::DisplayAs;
 use super::{metrics::ExecutionPlanMetricsSet, Statistics};
 
+use futures::stream::{Stream, StreamExt};
+use log::trace;
+
 /// CoalesceBatchesExec combines small batches into larger batches for more 
efficient use of
 /// vectorized processing by upstream operators.
 #[derive(Debug)]
@@ -134,6 +136,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        self.input.ordering_equivalence_properties()
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs 
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 82f74a62bb..bc48b5f5e1 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -21,18 +21,17 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use arrow::datatypes::SchemaRef;
-
 use super::expressions::PhysicalSortExpr;
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::stream::{ObservedStream, RecordBatchReceiverStream};
-use super::{DisplayAs, Statistics};
+use super::{DisplayAs, SendableRecordBatchStream, Statistics};
+
 use crate::physical_plan::{
     DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
 };
-use datafusion_common::{DataFusionError, Result};
 
-use super::SendableRecordBatchStream;
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{DataFusionError, Result};
 use datafusion_execution::TaskContext;
 
 /// Merge execution plan executes partitions in parallel and combines them 
into a single
diff --git a/datafusion/core/src/physical_plan/filter.rs 
b/datafusion/core/src/physical_plan/filter.rs
index d2cfc3ce8a..6cb2490ee4 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -27,24 +27,27 @@ use super::expressions::PhysicalSortExpr;
 use super::{
     ColumnStatistics, DisplayAs, RecordBatchStream, SendableRecordBatchStream, 
Statistics,
 };
+
 use crate::physical_plan::{
     metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
     Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan, 
Partitioning,
     PhysicalExpr,
 };
+
 use arrow::compute::filter_record_batch;
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::TaskContext;
 use datafusion_expr::Operator;
 use datafusion_physical_expr::expressions::BinaryExpr;
-use datafusion_physical_expr::{split_conjunction, AnalysisContext};
-
-use log::trace;
+use datafusion_physical_expr::{
+    split_conjunction, AnalysisContext, OrderingEquivalenceProperties,
+};
 
-use datafusion_execution::TaskContext;
 use futures::stream::{Stream, StreamExt};
+use log::trace;
 
 /// FilterExec evaluates a boolean predicate against all input batches to 
determine which rows to
 /// include in its output batches.
@@ -148,6 +151,10 @@ impl ExecutionPlan for FilterExec {
         input_properties
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        self.input.ordering_equivalence_properties()
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 47403a70f4..ce1d6dbcc0 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -18,46 +18,15 @@
 //! Defines the join plan for executing partitions in parallel and then 
joining the results
 //! into a set of partitions.
 
-use ahash::RandomState;
-use arrow::array::Array;
-use arrow::array::{
-    Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
-    StringArray, TimestampNanosecondArray, UInt16Array, UInt32Array, 
UInt64Array,
-    UInt8Array,
-};
-use arrow::buffer::BooleanBuffer;
-use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder};
-use arrow::datatypes::{ArrowNativeType, DataType};
-use arrow::datatypes::{Schema, SchemaRef};
-use arrow::record_batch::RecordBatch;
-use arrow::{
-    array::{
-        ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
-        DictionaryArray, FixedSizeBinaryArray, LargeStringArray, 
PrimitiveArray,
-        Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
-        Time64NanosecondArray, TimestampMicrosecondArray, 
TimestampMillisecondArray,
-        TimestampSecondArray, UInt32BufferBuilder, UInt64BufferBuilder,
-    },
-    datatypes::{
-        Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, 
UInt64Type,
-        UInt8Type,
-    },
-    util::bit_util,
-};
-use arrow_array::cast::downcast_array;
-use arrow_schema::ArrowError;
-use futures::{ready, Stream, StreamExt, TryStreamExt};
 use std::fmt;
 use std::mem::size_of;
 use std::sync::Arc;
 use std::task::Poll;
 use std::{any::Any, usize, vec};
 
-use datafusion_common::cast::{as_dictionary_array, as_string_array};
-use datafusion_execution::memory_pool::MemoryReservation;
-
 use crate::physical_plan::joins::utils::{
-    adjust_indices_by_join_type, apply_join_filter_to_indices, 
build_batch_from_indices,
+    add_offset_to_ordering_equivalence_classes, adjust_indices_by_join_type,
+    apply_join_filter_to_indices, build_batch_from_indices,
     calculate_hash_join_output_order, get_final_indices_from_bit_map,
     need_produce_result_in_final, JoinSide,
 };
@@ -68,6 +37,7 @@ use crate::physical_plan::{
     expressions::Column,
     expressions::PhysicalSortExpr,
     hash_utils::create_hashes,
+    joins::hash_join_utils::JoinHashMap,
     joins::utils::{
         adjust_right_output_partitioning, build_join_schema, 
check_join_is_valid,
         combine_join_equivalence_properties, estimate_join_statistics,
@@ -78,17 +48,42 @@ use crate::physical_plan::{
     DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, 
Partitioning,
     PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
-use arrow::array::BooleanBufferBuilder;
-use arrow::datatypes::TimeUnit;
-use datafusion_common::JoinType;
-use datafusion_common::{DataFusionError, Result};
-use datafusion_execution::{memory_pool::MemoryConsumer, TaskContext};
 
 use super::{
     utils::{OnceAsync, OnceFut},
     PartitionMode,
 };
-use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
+
+use arrow::buffer::BooleanBuffer;
+use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder};
+use arrow::record_batch::RecordBatch;
+use arrow::{
+    array::{
+        Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Date32Array, 
Date64Array,
+        Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array,
+        Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, 
LargeStringArray,
+        PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
+        Time64MicrosecondArray, Time64NanosecondArray, 
TimestampMicrosecondArray,
+        TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray,
+        UInt16Array, UInt32Array, UInt32BufferBuilder, UInt64Array, 
UInt64BufferBuilder,
+        UInt8Array,
+    },
+    datatypes::{
+        ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, 
Schema,
+        SchemaRef, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+    },
+    util::bit_util,
+};
+use arrow_array::cast::downcast_array;
+use arrow_schema::ArrowError;
+use datafusion_common::cast::{as_dictionary_array, as_string_array};
+use datafusion_common::{DataFusionError, JoinType, Result};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::OrderingEquivalenceProperties;
+
+use ahash::RandomState;
+use futures::{ready, Stream, StreamExt, TryStreamExt};
 
 type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
 
@@ -381,6 +376,34 @@ impl ExecutionPlan for HashJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        let mut new_properties = 
OrderingEquivalenceProperties::new(self.schema());
+        let left_columns_len = self.left.schema().fields.len();
+        let right_oeq_properties = 
self.right.ordering_equivalence_properties();
+        match self.join_type {
+            JoinType::RightAnti | JoinType::RightSemi => {
+                // For `RightAnti` and `RightSemi` joins, the right table 
schema remains valid.
+                // Hence, its ordering equivalence properties can be used as 
is.
+                
new_properties.extend(right_oeq_properties.classes().iter().cloned());
+            }
+            JoinType::Inner => {
+                // For `Inner` joins, the right table schema is no longer 
valid.
+                // Size of the left table is added as an offset to the right 
table
+                // columns when constructing the join output schema.
+                let updated_right_classes = 
add_offset_to_ordering_equivalence_classes(
+                    right_oeq_properties.classes(),
+                    left_columns_len,
+                )
+                .unwrap();
+                new_properties.extend(updated_right_classes);
+            }
+            // In other cases, we cannot propagate ordering equivalences as
+            // the output ordering is not preserved.
+            _ => {}
+        }
+        new_properties
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.left.clone(), self.right.clone()]
     }
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 2a628a6146..9b8e9e85cd 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -30,17 +30,10 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use arrow::array::*;
-use arrow::compute::{concat_batches, take, SortOptions};
-use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
-use arrow::error::ArrowError;
-use arrow::record_batch::RecordBatch;
-use datafusion_physical_expr::PhysicalSortRequirement;
-use futures::{Stream, StreamExt};
-
 use crate::physical_plan::expressions::Column;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::joins::utils::{
+    add_offset_to_lex_ordering, add_offset_to_ordering_equivalence_classes,
     build_join_schema, check_join_is_valid, 
combine_join_equivalence_properties,
     estimate_join_statistics, partitioned_join_output_partitioning, JoinOn,
 };
@@ -50,13 +43,20 @@ use crate::physical_plan::{
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
-use datafusion_common::DataFusionError;
-use datafusion_common::JoinType;
-use datafusion_common::Result;
+
+use arrow::array::*;
+use arrow::compute::{concat_batches, take, SortOptions};
+use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
+use arrow::error::ArrowError;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, JoinType, Result};
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::TaskContext;
+use datafusion_physical_expr::utils::normalize_sort_exprs;
+use datafusion_physical_expr::{OrderingEquivalenceProperties, 
PhysicalSortRequirement};
 
-use datafusion_common::tree_node::{Transformed, TreeNode};
+use futures::{Stream, StreamExt};
+use itertools::Itertools;
 
 /// join execution plan executes partitions in parallel and combines them into 
a set of
 /// partitions.
@@ -86,6 +86,38 @@ pub struct SortMergeJoinExec {
     pub(crate) null_equals_null: bool,
 }
 
+/// Replaces the right column (first index in the `on_column` tuple) with
+/// the left column (zeroth index in the tuple) inside `right_ordering`.
+fn replace_on_columns_of_right_ordering(
+    on_columns: &[(Column, Column)],
+    right_ordering: &mut [PhysicalSortExpr],
+    left_columns_len: usize,
+) {
+    for (left_col, right_col) in on_columns {
+        let right_col =
+            Column::new(right_col.name(), right_col.index() + 
left_columns_len);
+        for item in right_ordering.iter_mut() {
+            if let Some(col) = item.expr.as_any().downcast_ref::<Column>() {
+                if right_col.eq(col) {
+                    item.expr = Arc::new(left_col.clone()) as _;
+                }
+            }
+        }
+    }
+}
+
+/// Merge left and right sort expressions, checking for duplicates.
+fn merge_vectors(
+    left: &[PhysicalSortExpr],
+    right: &[PhysicalSortExpr],
+) -> Vec<PhysicalSortExpr> {
+    left.iter()
+        .cloned()
+        .chain(right.iter().cloned())
+        .unique()
+        .collect()
+}
+
 impl SortMergeJoinExec {
     /// Tries to create a new [SortMergeJoinExec].
     /// The inputs are sorted using `sort_options` are applied to the columns 
in the `on`
@@ -134,10 +166,26 @@ impl SortMergeJoinExec {
             .unzip();
 
         let output_ordering = match join_type {
-            JoinType::Inner
-            | JoinType::Left
-            | JoinType::LeftSemi
-            | JoinType::LeftAnti => {
+            JoinType::Inner => {
+                match (left.output_ordering(), right.output_ordering()) {
+                    // If both sides have orderings, ordering of the right 
hand side
+                    // can be appended to the left side ordering for inner 
joins.
+                    (Some(left_ordering), Some(right_ordering)) => {
+                        let left_columns_len = left.schema().fields.len();
+                        let mut right_ordering =
+                            add_offset_to_lex_ordering(right_ordering, 
left_columns_len)?;
+                        replace_on_columns_of_right_ordering(
+                            &on,
+                            &mut right_ordering,
+                            left_columns_len,
+                        );
+                        Some(merge_vectors(left_ordering, &right_ordering))
+                    }
+                    (Some(left_ordering), _) => Some(left_ordering.to_vec()),
+                    _ => None,
+                }
+            }
+            JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
                 left.output_ordering().map(|sort_exprs| sort_exprs.to_vec())
             }
             JoinType::RightSemi | JoinType::RightAnti => right
@@ -148,29 +196,7 @@ impl SortMergeJoinExec {
                 right
                     .output_ordering()
                     .map(|sort_exprs| {
-                        let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = 
sort_exprs
-                            .iter()
-                            .map(|e| {
-                                let new_expr =
-                                    e.expr.clone().transform_down(&|e| match e
-                                        .as_any()
-                                        .downcast_ref::<Column>(
-                                    ) {
-                                        Some(col) => {
-                                            
Ok(Transformed::Yes(Arc::new(Column::new(
-                                                col.name(),
-                                                left_columns_len + col.index(),
-                                            ))))
-                                        }
-                                        None => Ok(Transformed::No(e)),
-                                    });
-                                Ok(PhysicalSortExpr {
-                                    expr: new_expr?,
-                                    options: e.options,
-                                })
-                            })
-                            .collect();
-                        new_sort_exprs
+                        add_offset_to_lex_ordering(sort_exprs, 
left_columns_len)
                     })
                     .map_or(Ok(None), |v| v.map(Some))?
             }
@@ -295,6 +321,65 @@ impl ExecutionPlan for SortMergeJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        let mut new_properties = 
OrderingEquivalenceProperties::new(self.schema());
+        let left_columns_len = self.left.schema().fields.len();
+        let left_oeq_properties = self.left.ordering_equivalence_properties();
+        let right_oeq_properties = 
self.right.ordering_equivalence_properties();
+        match self.join_type {
+            JoinType::Inner => {
+                // Since left side is the stream side for this `SortMergeJoin` 
implementation,
+                // global ordering of the left table is preserved at the 
output. Hence, left
+                // side ordering equivalences are still valid.
+                
new_properties.extend(left_oeq_properties.classes().iter().cloned());
+                if let Some(output_ordering) = &self.output_ordering {
+                    // Update right table ordering equivalence expression 
indices; i.e.
+                    // add left table size as an offset.
+                    let updated_right_oeq_classes =
+                        add_offset_to_ordering_equivalence_classes(
+                            right_oeq_properties.classes(),
+                            left_columns_len,
+                        )
+                        .unwrap();
+                    let left_output_ordering = 
self.left.output_ordering().unwrap_or(&[]);
+                    // Right side ordering equivalence properties should be 
prepended with
+                    // those of the left side while constructing output 
ordering equivalence
+                    // properties for `SortMergeJoin`. As an example;
+                    //
+                    // If the right table ordering equivalences contain `b 
ASC`, and the output
+                    // ordering of the left table is `a ASC`, then the 
ordering equivalence `b ASC`
+                    // for the right table should be converted to `a ASC, b 
ASC` before it is added
+                    // to the ordering equivalences of `SortMergeJoinExec`.
+                    for oeq_class in updated_right_oeq_classes {
+                        for ordering in oeq_class.others() {
+                            // Entries inside ordering equivalence should be 
normalized before insertion.
+                            let normalized_ordering = normalize_sort_exprs(
+                                ordering,
+                                self.equivalence_properties().classes(),
+                                &[],
+                            );
+                            let new_oeq_ordering =
+                                merge_vectors(left_output_ordering, 
&normalized_ordering);
+                            new_properties.add_equal_conditions((
+                                output_ordering,
+                                &new_oeq_ordering,
+                            ));
+                        }
+                    }
+                }
+            }
+            JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
+                
new_properties.extend(left_oeq_properties.classes().iter().cloned());
+            }
+            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
+                
new_properties.extend(right_oeq_properties.classes().iter().cloned());
+            }
+            // All ordering equivalences from left and/or right sides are 
invalidated.
+            _ => {}
+        }
+        new_properties
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.left.clone(), self.right.clone()]
     }
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs 
b/datafusion/core/src/physical_plan/joins/utils.rs
index d4be2cfe10..ea9c11a3ed 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -17,17 +17,6 @@
 
 //! Join related functionality used both on logical and physical plans
 
-use arrow::array::{
-    downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
-    UInt32Builder, UInt64Array,
-};
-use arrow::compute;
-use arrow::datatypes::{Field, Schema, SchemaBuilder};
-use arrow::record_batch::{RecordBatch, RecordBatchOptions};
-use datafusion_physical_expr::expressions::Column;
-use futures::future::{BoxFuture, Shared};
-use futures::{ready, FutureExt};
-use parking_lot::Mutex;
 use std::cmp::max;
 use std::collections::HashSet;
 use std::fmt::{Display, Formatter};
@@ -36,21 +25,32 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::usize;
 
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::{ScalarValue, SharedResult};
-
-use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_physical_expr::{EquivalentClass, PhysicalExpr, 
PhysicalSortExpr};
-
-use datafusion_common::JoinType;
-use datafusion_common::{DataFusionError, Result};
-
 use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet, 
MetricBuilder};
 use crate::physical_plan::SchemaRef;
 use crate::physical_plan::{
     ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning, 
Statistics,
 };
 
+use arrow::array::{
+    downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
+    UInt32Builder, UInt64Array,
+};
+use arrow::compute;
+use arrow::datatypes::{Field, Schema, SchemaBuilder};
+use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{DataFusionError, JoinType, Result, ScalarValue, 
SharedResult};
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{
+    EquivalentClass, LexOrdering, LexOrderingRef, OrderingEquivalentClass, 
PhysicalExpr,
+    PhysicalSortExpr,
+};
+
+use futures::future::{BoxFuture, Shared};
+use futures::{ready, FutureExt};
+use parking_lot::Mutex;
+
 /// The on clause of the join, as vector of (left, right) columns.
 pub type JoinOn = Vec<(Column, Column)>;
 /// Reference for JoinOn.
@@ -283,6 +283,64 @@ pub fn cross_join_equivalence_properties(
     new_properties
 }
 
+/// Adds the `offset` value to `Column` indices inside `expr`. This function is
+/// generally used during the update of the right table schema in join 
operations.
+pub(crate) fn add_offset_to_expr(
+    expr: Arc<dyn PhysicalExpr>,
+    offset: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+        Some(col) => Ok(Transformed::Yes(Arc::new(Column::new(
+            col.name(),
+            offset + col.index(),
+        )))),
+        None => Ok(Transformed::No(e)),
+    })
+}
+
+/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`.
+pub(crate) fn add_offset_to_sort_expr(
+    sort_expr: &PhysicalSortExpr,
+    offset: usize,
+) -> Result<PhysicalSortExpr> {
+    Ok(PhysicalSortExpr {
+        expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?,
+        options: sort_expr.options,
+    })
+}
+
+/// Adds the `offset` value to `Column` indices for each `sort_expr.expr`
+/// inside `sort_exprs`.
+pub(crate) fn add_offset_to_lex_ordering(
+    sort_exprs: LexOrderingRef,
+    offset: usize,
+) -> Result<LexOrdering> {
+    sort_exprs
+        .iter()
+        .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset))
+        .collect()
+}
+
+/// Adds the `offset` value to `Column` indices for all expressions inside the
+/// given `OrderingEquivalentClass`es.
+pub(crate) fn add_offset_to_ordering_equivalence_classes(
+    oeq_classes: &[OrderingEquivalentClass],
+    offset: usize,
+) -> Result<Vec<OrderingEquivalentClass>> {
+    oeq_classes
+        .iter()
+        .map(|prop| {
+            let new_head = add_offset_to_lex_ordering(prop.head(), offset)?;
+            let new_others = prop
+                .others()
+                .iter()
+                .map(|ordering| add_offset_to_lex_ordering(ordering, offset))
+                .collect::<Result<Vec<_>>>()?;
+            Ok(OrderingEquivalentClass::new(new_head, new_others))
+        })
+        .collect()
+}
+
 impl Display for JoinSide {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         match self {
diff --git a/datafusion/core/src/physical_plan/limit.rs 
b/datafusion/core/src/physical_plan/limit.rs
index 572cea006c..93f6cd7c2c 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -17,9 +17,6 @@
 
 //! Defines the LIMIT plan
 
-use futures::stream::Stream;
-use futures::stream::StreamExt;
-use log::trace;
 use std::any::Any;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -28,19 +25,21 @@ use std::task::{Context, Poll};
 use crate::physical_plan::{
     DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, 
Partitioning,
 };
+
+use super::expressions::PhysicalSortExpr;
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+
 use arrow::array::ArrayRef;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::{RecordBatch, RecordBatchOptions};
 use datafusion_common::{DataFusionError, Result};
-
-use super::expressions::PhysicalSortExpr;
-use super::DisplayAs;
-use super::{
-    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
-    RecordBatchStream, SendableRecordBatchStream, Statistics,
-};
-
 use datafusion_execution::TaskContext;
+use datafusion_physical_expr::OrderingEquivalenceProperties;
+
+use futures::stream::Stream;
+use futures::stream::StreamExt;
+use log::trace;
 
 /// Limit execution plan
 #[derive(Debug)]
@@ -140,6 +139,10 @@ impl ExecutionPlan for GlobalLimitExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        self.input.ordering_equivalence_properties()
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
@@ -317,6 +320,10 @@ impl ExecutionPlan for LocalLimitExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        self.input.ordering_equivalence_properties()
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index c6845c7afe..dac5227503 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -41,6 +41,8 @@ use super::expressions::{Column, PhysicalSortExpr};
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, 
Statistics};
 
+use 
datafusion_physical_expr::equivalence::update_ordering_equivalence_with_cast;
+use datafusion_physical_expr::expressions::CastExpr;
 use datafusion_physical_expr::{
     normalize_out_expr_with_columns_map, project_equivalence_properties,
     project_ordering_equivalence_properties, OrderingEquivalenceProperties,
@@ -245,11 +247,29 @@ impl ExecutionPlan for ProjectionExec {
 
     fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
         let mut new_properties = 
OrderingEquivalenceProperties::new(self.schema());
+        if self.output_ordering.is_none() {
+            // If there is no output ordering, return an "empty" equivalence 
set:
+            return new_properties;
+        }
+
+        let mut input_oeq = self.input().ordering_equivalence_properties();
+        // Stores cast expression and its `Column` version in the output:
+        let mut cast_exprs: Vec<(CastExpr, Column)> = vec![];
+        for (idx, (expr, name)) in self.expr.iter().enumerate() {
+            if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
+                let target_col = Column::new(name, idx);
+                cast_exprs.push((cast_expr.clone(), target_col));
+            }
+        }
+
+        update_ordering_equivalence_with_cast(&cast_exprs, &mut input_oeq);
+
         project_ordering_equivalence_properties(
-            self.input.ordering_equivalence_properties(),
+            input_oeq,
             &self.columns_map,
             &mut new_properties,
         );
+
         new_properties
     }
 
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 6175c19576..c5b8b2da42 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -24,19 +24,16 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::{any::Any, vec};
 
+use crate::physical_plan::common::transpose;
 use crate::physical_plan::hash_utils::create_hashes;
+use crate::physical_plan::metrics::BaselineMetrics;
 use crate::physical_plan::repartition::distributor_channels::{
     channels, partition_aware_channels,
 };
+use crate::physical_plan::sorts::streaming_merge;
 use crate::physical_plan::{
     DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, 
Statistics,
 };
-use arrow::array::{ArrayRef, UInt64Builder};
-use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatch;
-use datafusion_common::{DataFusionError, Result};
-use datafusion_execution::memory_pool::MemoryConsumer;
-use log::trace;
 
 use self::distributor_channels::{DistributionReceiver, DistributionSender};
 
@@ -45,14 +42,18 @@ use super::expressions::PhysicalSortExpr;
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream};
 
-use crate::physical_plan::common::transpose;
-use crate::physical_plan::metrics::BaselineMetrics;
-use crate::physical_plan::sorts::streaming_merge;
+use arrow::array::{ArrayRef, UInt64Builder};
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::memory_pool::MemoryConsumer;
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalExpr};
+
 use futures::stream::Stream;
 use futures::{FutureExt, StreamExt};
 use hashbrown::HashMap;
+use log::trace;
 use parking_lot::Mutex;
 use tokio::task::JoinHandle;
 
@@ -399,6 +400,10 @@ impl ExecutionPlan for RepartitionExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        self.input.ordering_equivalence_properties()
+    }
+
     fn execute(
         &self,
         partition: usize,
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 8262a18f23..0dad1d30dd 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -20,22 +20,25 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use arrow::datatypes::SchemaRef;
-use log::{debug, trace};
-
 use crate::physical_plan::common::spawn_buffered;
+use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::sorts::streaming_merge;
-use crate::physical_plan::DisplayAs;
 use crate::physical_plan::{
-    expressions::PhysicalSortExpr, DisplayFormatType, Distribution, 
ExecutionPlan,
-    Partitioning, SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
 };
+
+use arrow::datatypes::SchemaRef;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
+use datafusion_physical_expr::{
+    EquivalenceProperties, OrderingEquivalenceProperties, 
PhysicalSortRequirement,
+};
+
+use log::{debug, trace};
 
 /// Sort preserving merge execution plan
 ///
@@ -163,6 +166,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        self.input.ordering_equivalence_properties()
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.input.clone()]
     }
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt 
b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index c2517ead0f..fdb76064ab 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -3060,3 +3060,209 @@ set datafusion.execution.target_partitions = 2;
 
 statement ok
 set datafusion.execution.batch_size = 4096;
+
+
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+CREATE EXTERNAL TABLE annotated_data (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# sort merge join should propagate ordering equivalence of the left side
+# for inner join. Hence final requirement rn1 ASC is already satisfied at
+# the end of SortMergeJoinExec.
+query TT
+EXPLAIN SELECT *
+  FROM (SELECT *, ROW_NUMBER() OVER() as rn1
+       FROM annotated_data ) as l_table
+  JOIN annotated_data as r_table
+  ON l_table.a = r_table.a
+  ORDER BY l_table.rn1
+----
+logical_plan
+Sort: l_table.rn1 ASC NULLS LAST
+--Inner Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b, 
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [rn1@5 ASC NULLS LAST]
+--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as 
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as 
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+
+# sort merge join should propagate ordering equivalence of the right side
+# for right join. Hence final requirement rn1 ASC is already satisfied at
+# the end of SortMergeJoinExec.
+query TT
+EXPLAIN SELECT *
+  FROM annotated_data as l_table
+  RIGHT JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+       FROM annotated_data ) as r_table
+  ON l_table.a = r_table.a
+  ORDER BY r_table.rn1
+----
+logical_plan
+Sort: r_table.rn1 ASC NULLS LAST
+--Right Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b, 
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [rn1@10 ASC NULLS LAST]
+--SortMergeJoin: join_type=Right, on=[(a@1, a@1)]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as 
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as 
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+
+# SortMergeJoin should add ordering equivalences of
+# right table as lexicographical append to the global ordering
+# below query shouldn't add any SortExec for order by clause.
+# since its requirement is already satisfied at the output of SortMergeJoinExec
+query TT
+EXPLAIN SELECT *
+  FROM (SELECT *, ROW_NUMBER() OVER() as rn1
+       FROM annotated_data ) as l_table
+  JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+       FROM annotated_data ) as r_table
+  ON l_table.a = r_table.a
+  ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1
+----
+logical_plan
+Sort: l_table.a ASC NULLS FIRST, l_table.b ASC NULLS LAST, l_table.c ASC NULLS 
LAST, r_table.rn1 ASC NULLS LAST
+--Inner Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b, 
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b, 
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11 
ASC NULLS LAST]
+--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as 
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as 
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as 
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as 
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;
+
+# to preserve ordering until Hash join set target partition to 1.
+# Otherwise RepartitionExec s inserted may broke ordering.
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+# hash join should propagate ordering equivalence of the right side for INNER 
join.
+# Hence final requirement rn1 ASC is already satisfied at the end of 
HashJoinExec.
+query TT
+EXPLAIN SELECT *
+  FROM annotated_data as l_table
+  JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+              FROM annotated_data) as r_table
+  ON l_table.a = r_table.a
+  ORDER BY r_table.rn1
+----
+logical_plan
+Sort: r_table.rn1 ASC NULLS LAST
+--Inner Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b, 
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+CoalesceBatchesExec: target_batch_size=4096
+--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@1, a@1)]
+----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, 
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+
+# hash join should propagate ordering equivalence of the right side for RIGHT 
ANTI join.
+# Hence final requirement rn1 ASC is already satisfied at the end of 
HashJoinExec.
+query TT
+EXPLAIN SELECT *
+  FROM annotated_data as l_table
+  RIGHT ANTI JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+              FROM annotated_data) as r_table
+  ON l_table.a = r_table.a
+  ORDER BY r_table.rn1
+----
+logical_plan
+Sort: r_table.rn1 ASC NULLS LAST
+--RightAnti Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------TableScan: annotated_data projection=[a]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b, 
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+CoalesceBatchesExec: target_batch_size=4096
+--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(a@0, a@1)]
+----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], 
output_ordering=[a@0 ASC], has_header=true
+----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, 
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+
+####
+# Config teardown
+####
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+drop table annotated_data;
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 21f76eb9cb..5d97b311d1 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2429,6 +2429,29 @@ GlobalLimitExec: skip=0, fetch=5
 ------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
 --------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, 
c9], has_header=true
 
+# Ordering equivalence should be preserved during cast expression
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                       CAST(ROW_NUMBER() OVER(ORDER BY c9 DESC) as BIGINT) as 
rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY rn1 ASC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, CAST(ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS Int64) AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[c9@0 as c9, CAST(ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 AS Int64) as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
+------SortExec: expr=[c9@0 DESC]
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
+
 # The following query has type error. We should test the error could be 
detected
 # from either the logical plan (when `skip_failed_rules` is set to `false`) or
 # the physical plan (when `skip_failed_rules` is set to `true`).
@@ -3055,6 +3078,33 @@ statement error DataFusion error: Error during planning: 
Aggregate ORDER BY is n
 EXPLAIN SELECT a, b, LAST_VALUE(c ORDER BY a ASC) OVER (order by a ASC) as 
last_c
        FROM annotated_data_infinite2
 
+# ordering equivalence information
+# should propagate through FilterExec, LimitExec, CoalesceBatchesExec, etc.
+# Below query should work without breaking pipeline
+query TT
+EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
+  FROM annotated_data_infinite2
+  ORDER BY rn1 ASC
+  LIMIT 5)
+  WHERE rn1 < 50
+  ORDER BY rn1 ASC
+----
+logical_plan
+Sort: rn1 ASC NULLS LAST
+--Filter: rn1 < UInt64(50)
+----Limit: skip=0, fetch=5
+------Sort: rn1 ASC NULLS LAST, fetch=5
+--------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.c, 
annotated_data_infinite2.d, ROW_NUMBER() ORDER BY [annotated_data_infinite2.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+----------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+------------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
+physical_plan
+CoalesceBatchesExec: target_batch_size=4096
+--FilterExec: rn1@5 < 50
+----GlobalLimitExec: skip=0, fetch=5
+------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as 
d, ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC 
NULLS LAST, c@3 ASC NULLS LAST], has_header=true
+
 statement ok
 drop table annotated_data_finite2
 
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index 4253917927..f458f4c709 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,14 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::expressions::{BinaryExpr, Column};
+use crate::expressions::{CastExpr, Column};
 use crate::{
     normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
     PhysicalSortExpr,
 };
 
 use arrow::datatypes::SchemaRef;
+use arrow_schema::Fields;
 
+use crate::utils::collect_columns;
 use std::collections::{HashMap, HashSet};
 use std::hash::Hash;
 use std::sync::Arc;
@@ -114,26 +116,6 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
     }
 }
 
-// Helper function to calculate column info recursively
-fn get_column_indices_helper(
-    indices: &mut Vec<(usize, String)>,
-    expr: &Arc<dyn PhysicalExpr>,
-) {
-    if let Some(col) = expr.as_any().downcast_ref::<Column>() {
-        indices.push((col.index(), col.name().to_string()))
-    } else if let Some(binary_expr) = 
expr.as_any().downcast_ref::<BinaryExpr>() {
-        get_column_indices_helper(indices, binary_expr.left());
-        get_column_indices_helper(indices, binary_expr.right());
-    };
-}
-
-/// Get index and name of each column that is in the expression (Can return 
multiple entries for `BinaryExpr`s)
-fn get_column_indices(expr: &Arc<dyn PhysicalExpr>) -> Vec<(usize, String)> {
-    let mut result = vec![];
-    get_column_indices_helper(&mut result, expr);
-    result
-}
-
 /// `OrderingEquivalenceProperties` keeps track of columns that describe the
 /// global ordering of the schema. These columns are not necessarily same; e.g.
 /// ```text
@@ -232,34 +214,54 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
 /// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` 
are ordering equivalent.
 pub type OrderingEquivalentClass = EquivalentClass<LexOrdering>;
 
-impl OrderingEquivalentClass {
-    /// This function extends ordering equivalences with alias information.
-    /// For instance, assume column a and b are aliases,
-    /// and column (a ASC), (c DESC) are ordering equivalent. We append (b 
ASC) to ordering equivalence,
-    /// since b is alias of colum a. After this function (a ASC), (c DESC), (b 
ASC) would be ordering equivalent.
-    fn update_with_aliases(&mut self, columns_map: &HashMap<Column, 
Vec<Column>>) {
-        for (column, columns) in columns_map {
-            let col_expr = Arc::new(column.clone()) as Arc<dyn PhysicalExpr>;
-            let mut to_insert = vec![];
-            for ordering in 
std::iter::once(&self.head).chain(self.others.iter()) {
-                for (idx, item) in ordering.iter().enumerate() {
-                    if item.expr.eq(&col_expr) {
-                        for col in columns {
-                            let col_expr = Arc::new(col.clone()) as Arc<dyn 
PhysicalExpr>;
-                            let mut normalized = self.head.clone();
-                            // Change the corresponding entry in the head with 
the alias column:
-                            let entry = &mut normalized[idx];
-                            (entry.expr, entry.options) = (col_expr, 
item.options);
-                            to_insert.push(normalized);
-                        }
-                    }
-                }
-            }
-            for items in to_insert {
-                self.insert(items);
+/// Update each expression in `ordering` with alias expressions. Assume
+/// `ordering` is `a ASC, b ASC` and `c` is alias of `b`. Then, the result
+/// will be `a ASC, c ASC`.
+fn update_with_alias(
+    mut ordering: LexOrdering,
+    oeq_alias_map: &[(Column, Column)],
+) -> LexOrdering {
+    for (source_col, target_col) in oeq_alias_map {
+        let source_col: Arc<dyn PhysicalExpr> = Arc::new(source_col.clone());
+        // Replace invalidated columns with its alias in the ordering 
expression.
+        let target_col: Arc<dyn PhysicalExpr> = Arc::new(target_col.clone());
+        for item in ordering.iter_mut() {
+            if item.expr.eq(&source_col) {
+                // Change the corresponding entry with alias expression
+                item.expr = target_col.clone();
             }
         }
     }
+    ordering
+}
+
+impl OrderingEquivalentClass {
+    /// This function updates ordering equivalences with alias information.
+    /// For instance, assume columns `a` and `b` are aliases (a as b), and
+    /// orderings `a ASC` and `c DESC` are equivalent. Here, we replace column
+    /// `a` with `b` in ordering equivalence expressions. After this function,
+    /// `a ASC`, `c DESC` will be converted to the `b ASC`, `c DESC`.
+    fn update_with_aliases(
+        &mut self,
+        oeq_alias_map: &[(Column, Column)],
+        fields: &Fields,
+    ) {
+        let is_head_invalid = self.head.iter().any(|sort_expr| {
+            collect_columns(&sort_expr.expr)
+                .iter()
+                .any(|col| is_column_invalid_in_new_schema(col, fields))
+        });
+        // If head is invalidated, update head with alias expressions
+        if is_head_invalid {
+            self.head = update_with_alias(self.head.clone(), oeq_alias_map);
+        } else {
+            let new_oeq_expr = update_with_alias(self.head.clone(), 
oeq_alias_map);
+            self.insert(new_oeq_expr);
+        }
+        for ordering in self.others.clone().into_iter() {
+            self.insert(update_with_alias(ordering, oeq_alias_map));
+        }
+    }
 }
 
 /// This is a builder object facilitating incremental construction
@@ -342,6 +344,22 @@ impl OrderingEquivalenceBuilder {
     }
 }
 
+/// Checks whether column is still valid after projection.
+fn is_column_invalid_in_new_schema(column: &Column, fields: &Fields) -> bool {
+    let idx = column.index();
+    idx >= fields.len() || fields[idx].name() != column.name()
+}
+
+/// Gets first aliased version of `col` found in `alias_map`.
+fn get_alias_column(
+    col: &Column,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) -> Option<Column> {
+    alias_map
+        .iter()
+        .find_map(|(column, columns)| column.eq(col).then(|| 
columns[0].clone()))
+}
+
 /// This function applies the given projection to the given equivalence
 /// properties to compute the resulting (projected) equivalence properties; 
e.g.
 /// 1) Adding an alias, which can introduce additional equivalence properties,
@@ -352,10 +370,21 @@ pub fn project_equivalence_properties(
     alias_map: &HashMap<Column, Vec<Column>>,
     output_eq: &mut EquivalenceProperties,
 ) {
+    // Get schema and fields of projection output
+    let schema = output_eq.schema();
+    let fields = schema.fields();
+
     let mut eq_classes = input_eq.classes().to_vec();
     for (column, columns) in alias_map {
         let mut find_match = false;
         for class in eq_classes.iter_mut() {
+            // If `self.head` is invalidated in the new schema, update head
+            // with this change `self.head` is not randomly assigned by one of 
the entries from `self.others`
+            if is_column_invalid_in_new_schema(&class.head, fields) {
+                if let Some(alias_col) = get_alias_column(&class.head, 
alias_map) {
+                    class.head = alias_col;
+                }
+            }
             if class.contains(column) {
                 for col in columns {
                     class.insert(col.clone());
@@ -370,15 +399,10 @@ pub fn project_equivalence_properties(
     }
 
     // Prune columns that are no longer in the schema from equivalences.
-    let schema = output_eq.schema();
-    let fields = schema.fields();
     for class in eq_classes.iter_mut() {
         let columns_to_remove = class
             .iter()
-            .filter(|column| {
-                let idx = column.index();
-                idx >= fields.len() || fields[idx].name() != column.name()
-            })
+            .filter(|column| is_column_invalid_in_new_schema(column, fields))
             .cloned()
             .collect::<Vec<_>>();
         for column in columns_to_remove {
@@ -402,25 +426,33 @@ pub fn project_ordering_equivalence_properties(
     columns_map: &HashMap<Column, Vec<Column>>,
     output_eq: &mut OrderingEquivalenceProperties,
 ) {
+    // Get schema and fields of projection output
+    let schema = output_eq.schema();
+    let fields = schema.fields();
+
     let mut eq_classes = input_eq.classes().to_vec();
+    let mut oeq_alias_map = vec![];
+    for (column, columns) in columns_map {
+        if is_column_invalid_in_new_schema(column, fields) {
+            oeq_alias_map.push((column.clone(), columns[0].clone()));
+        }
+    }
     for class in eq_classes.iter_mut() {
-        class.update_with_aliases(columns_map);
+        class.update_with_aliases(&oeq_alias_map, fields);
     }
 
     // Prune columns that no longer is in the schema from from the 
OrderingEquivalenceProperties.
-    let schema = output_eq.schema();
-    let fields = schema.fields();
     for class in eq_classes.iter_mut() {
         let sort_exprs_to_remove = class
             .iter()
             .filter(|sort_exprs| {
                 sort_exprs.iter().any(|sort_expr| {
-                    let col_infos = get_column_indices(&sort_expr.expr);
+                    let cols_in_expr = collect_columns(&sort_expr.expr);
                     // If any one of the columns, used in Expression is 
invalid, remove expression
                     // from ordering equivalences
-                    col_infos.into_iter().any(|(idx, name)| {
-                        idx >= fields.len() || fields[idx].name() != &name
-                    })
+                    cols_in_expr
+                        .iter()
+                        .any(|col| is_column_invalid_in_new_schema(col, 
fields))
                 })
             })
             .cloned()
@@ -434,6 +466,42 @@ pub fn project_ordering_equivalence_properties(
     output_eq.extend(eq_classes);
 }
 
+/// Update `ordering` if it contains cast expression with target column
+/// after projection, if there is no cast expression among `ordering` 
expressions,
+/// returns `None`.
+fn update_with_cast_exprs(
+    cast_exprs: &[(CastExpr, Column)],
+    mut ordering: LexOrdering,
+) -> Option<LexOrdering> {
+    let mut is_changed = false;
+    for sort_expr in ordering.iter_mut() {
+        for (cast_expr, target_col) in cast_exprs.iter() {
+            if sort_expr.expr.eq(cast_expr.expr()) {
+                sort_expr.expr = Arc::new(target_col.clone()) as _;
+                is_changed = true;
+            }
+        }
+    }
+    is_changed.then_some(ordering)
+}
+
+/// Update cast expressions inside ordering equivalence
+/// properties with its target column after projection
+pub fn update_ordering_equivalence_with_cast(
+    cast_exprs: &[(CastExpr, Column)],
+    input_oeq: &mut OrderingEquivalenceProperties,
+) {
+    for cls in input_oeq.classes.iter_mut() {
+        for ordering in
+            
std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter())
+        {
+            if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs, 
ordering) {
+                cls.insert(updated_ordering);
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -441,7 +509,6 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::Result;
 
-    use datafusion_expr::Operator;
     use std::sync::Arc;
 
     #[test]
@@ -534,18 +601,4 @@ mod tests {
 
         Ok(())
     }
-
-    #[test]
-    fn test_get_column_infos() -> Result<()> {
-        let expr1 = Arc::new(Column::new("col1", 2)) as _;
-        assert_eq!(get_column_indices(&expr1), vec![(2, "col1".to_string())]);
-        let expr2 = Arc::new(Column::new("col2", 5)) as _;
-        assert_eq!(get_column_indices(&expr2), vec![(5, "col2".to_string())]);
-        let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as 
_;
-        assert_eq!(
-            get_column_indices(&expr3),
-            vec![(2, "col1".to_string()), (5, "col2".to_string())]
-        );
-        Ok(())
-    }
 }
diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index b5916def86..b6c3536a1e 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -41,7 +41,7 @@ fn default_cast_options() -> CastOptions<'static> {
 }
 
 /// CAST expression casts an expression to a specific data type and returns a 
runtime error on invalid cast
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct CastExpr {
     /// The expression to cast
     expr: Arc<dyn PhysicalExpr>,
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index b695ee169e..165faba442 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -59,7 +59,8 @@ pub use physical_expr::{AnalysisContext, ExprBoundaries, 
PhysicalExpr, PhysicalE
 pub use planner::create_physical_expr;
 pub use scalar_function::ScalarFunctionExpr;
 pub use sort_expr::{
-    LexOrdering, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement,
+    LexOrdering, LexOrderingRef, LexOrderingReq, PhysicalSortExpr,
+    PhysicalSortRequirement,
 };
 pub use utils::{
     expr_list_eq_any_order, expr_list_eq_strict_order,
diff --git a/datafusion/physical-expr/src/sort_expr.rs 
b/datafusion/physical-expr/src/sort_expr.rs
index 659e8c85e8..0ba4627bc7 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -17,13 +17,15 @@
 
 //! Sort expressions
 
+use std::hash::{Hash, Hasher};
+use std::sync::Arc;
+
 use crate::PhysicalExpr;
+
 use arrow::compute::kernels::sort::{SortColumn, SortOptions};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::ColumnarValue;
-use std::hash::{Hash, Hasher};
-use std::sync::Arc;
 
 /// Represents Sort operation for a column in a RecordBatch
 #[derive(Clone, Debug)]
@@ -227,5 +229,8 @@ fn to_str(options: &SortOptions) -> &str {
 ///`LexOrdering` is a type alias for lexicographical ordering 
definition`Vec<PhysicalSortExpr>`
 pub type LexOrdering = Vec<PhysicalSortExpr>;
 
+///`LexOrderingRef` is a type alias for lexicographical ordering reference 
&`[PhysicalSortExpr]`
+pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
+
 ///`LexOrderingReq` is a type alias for lexicographical ordering requirement 
definition`Vec<PhysicalSortRequirement>`
 pub type LexOrderingReq = Vec<PhysicalSortRequirement>;
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index f95ec032eb..3fbb6ab9f9 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -1464,4 +1464,24 @@ mod tests {
         assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
         Ok(())
     }
+
+    #[test]
+    fn test_collect_columns() -> Result<()> {
+        let expr1 = Arc::new(Column::new("col1", 2)) as _;
+        let mut expected = HashSet::new();
+        expected.insert(Column::new("col1", 2));
+        assert_eq!(collect_columns(&expr1), expected);
+
+        let expr2 = Arc::new(Column::new("col2", 5)) as _;
+        let mut expected = HashSet::new();
+        expected.insert(Column::new("col2", 5));
+        assert_eq!(collect_columns(&expr2), expected);
+
+        let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as 
_;
+        let mut expected = HashSet::new();
+        expected.insert(Column::new("col1", 2));
+        expected.insert(Column::new("col2", 5));
+        assert_eq!(collect_columns(&expr3), expected);
+        Ok(())
+    }
 }

Reply via email to