This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 93a4941b05 Add Support for Ordering Equivalence (#6160)
93a4941b05 is described below

commit 93a4941b05c2b468f886dc50e799b2d6a80a46c6
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed May 3 13:38:24 2023 +0300

    Add Support for Ordering Equivalence (#6160)
    
    * Initial commit
    
    * All functionality
    
    * minor changes
    
    * simplifications
    
    * simplifications
    
    * simplifications
    
    * tmp
    
    * simplifications
    
    * simplifications
    
    * fix errors
    
    * simplifications
    
    * simplifications
    
    * simplifications
    
    * resolve merge error
    
    * fix index of matching input column
    
    * simplifications, update comments. move tests to the window.slt
    
    * separate equivalent and ordering equivalent expressions
    
    * Change order of the arguments
    
    * simplifications
    
    * Minor changes, update comments
    
    * Simplifications and improved comments
    
    * Address reviews
    
    * Add unit tests
    
    * simplifications
    
    * Code simplifications, comments improvements
    
    * Update comment, change test to make intent clearer
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../src/physical_optimizer/sort_enforcement.rs     |   2 +
 .../core/src/physical_optimizer/sort_pushdown.rs   |  36 +-
 datafusion/core/src/physical_optimizer/utils.rs    |   9 +-
 datafusion/core/src/physical_plan/mod.rs           |   6 +
 datafusion/core/src/physical_plan/projection.rs    |  34 +-
 .../windows/bounded_window_agg_exec.rs             |  10 +-
 datafusion/core/src/physical_plan/windows/mod.rs   |  64 ++-
 .../src/physical_plan/windows/window_agg_exec.rs   |   9 +-
 .../core/tests/sqllogictests/test_files/window.slt | 165 +++++-
 datafusion/physical-expr/src/equivalence.rs        | 191 +++++--
 datafusion/physical-expr/src/lib.rs                |  10 +-
 datafusion/physical-expr/src/utils.rs              | 609 ++++++++++++++++++---
 12 files changed, 1004 insertions(+), 141 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 8bb71445a4..1032a16c7f 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -473,6 +473,7 @@ fn ensure_sorting(
                     physical_ordering,
                     &required_ordering,
                     || child.equivalence_properties(),
+                    || child.ordering_equivalence_properties(),
                 ) {
                     // Make sure we preserve the ordering requirements:
                     update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
@@ -539,6 +540,7 @@ fn analyze_immediate_sort_removal(
             sort_input.output_ordering(),
             sort_exec.output_ordering(),
             || sort_input.equivalence_properties(),
+            || sort_input.ordering_equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs 
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index f9ca976299..a0490fa64e 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -125,9 +125,12 @@ pub(crate) fn pushdown_sorts(
     let err = || DataFusionError::Plan(ERR_MSG.to_string());
     if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
         let mut new_plan = plan.clone();
-        if !ordering_satisfy_requirement(plan.output_ordering(), 
parent_required, || {
-            plan.equivalence_properties()
-        }) {
+        if !ordering_satisfy_requirement(
+            plan.output_ordering(),
+            parent_required,
+            || plan.equivalence_properties(),
+            || plan.ordering_equivalence_properties(),
+        ) {
             // If the current plan is a SortExec, modify it to satisfy parent 
requirements:
             let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
                 parent_required.ok_or_else(err)?.iter().cloned(),
@@ -155,9 +158,12 @@ pub(crate) fn pushdown_sorts(
         }
     } else {
         // Executors other than SortExec
-        if ordering_satisfy_requirement(plan.output_ordering(), 
parent_required, || {
-            plan.equivalence_properties()
-        }) {
+        if ordering_satisfy_requirement(
+            plan.output_ordering(),
+            parent_required,
+            || plan.equivalence_properties(),
+            || plan.ordering_equivalence_properties(),
+        ) {
             // Satisfies parent requirements, immediately return.
             return Ok(Transformed::Yes(SortPushDown {
                 required_ordering: None,
@@ -280,14 +286,20 @@ fn determine_children_requirement(
     request_child: Option<&[PhysicalSortRequirement]>,
     child_plan: Arc<dyn ExecutionPlan>,
 ) -> RequirementsCompatibility {
-    if requirements_compatible(request_child, parent_required, || {
-        child_plan.equivalence_properties()
-    }) {
+    if requirements_compatible(
+        request_child,
+        parent_required,
+        || child_plan.ordering_equivalence_properties(),
+        || child_plan.equivalence_properties(),
+    ) {
         // request child requirements are more specific, no need to push down 
the parent requirements
         RequirementsCompatibility::Satisfy
-    } else if requirements_compatible(parent_required, request_child, || {
-        child_plan.equivalence_properties()
-    }) {
+    } else if requirements_compatible(
+        parent_required,
+        request_child,
+        || child_plan.ordering_equivalence_properties(),
+        || child_plan.equivalence_properties(),
+    ) {
         // parent requirements are more specific, adjust the request child 
requirements and push down the new requirements
         let adjusted = parent_required.map(|r| r.to_vec());
         RequirementsCompatibility::Compatible(adjusted)
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index b323e13679..68efa06c3f 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -41,9 +41,12 @@ pub fn add_sort_above(
     sort_expr: Vec<PhysicalSortExpr>,
 ) -> Result<()> {
     // If the ordering requirement is already satisfied, do not add a sort.
-    if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
-        node.equivalence_properties()
-    }) {
+    if !ordering_satisfy(
+        node.output_ordering(),
+        Some(&sort_expr),
+        || node.equivalence_properties(),
+        || node.ordering_equivalence_properties(),
+    ) {
         let new_sort = SortExec::new(sort_expr, node.clone());
 
         *node = Arc::new(if node.output_partitioning().partition_count() > 1 {
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index 01ece80aca..00850f3237 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -32,6 +32,7 @@ use arrow::record_batch::RecordBatch;
 pub use datafusion_expr::Accumulator;
 pub use datafusion_expr::ColumnarValue;
 pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
+use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
 pub use display::DisplayFormatType;
 use futures::stream::{Stream, TryStreamExt};
 use std::fmt;
@@ -187,6 +188,11 @@ pub trait ExecutionPlan: Debug + Send + Sync {
         EquivalenceProperties::new(self.schema())
     }
 
+    /// Get the OrderingEquivalenceProperties within the plan
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        OrderingEquivalenceProperties::new(self.schema())
+    }
+
     /// Get a list of child execution plans that provide the input for this 
plan. The returned list
     /// will be empty for leaf nodes, will contain a single value for unary 
nodes, or two
     /// values for binary nodes (such as joins).
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index 49c429f94d..e70e4faebd 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -27,22 +27,24 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::error::Result;
+use crate::execution::context::TaskContext;
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
     Partitioning, PhysicalExpr,
 };
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use futures::stream::{Stream, StreamExt};
 use log::debug;
 
 use super::expressions::{Column, PhysicalSortExpr};
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
-use crate::execution::context::TaskContext;
-use datafusion_physical_expr::equivalence::project_equivalence_properties;
-use datafusion_physical_expr::normalize_out_expr_with_columns_map;
-use futures::stream::Stream;
-use futures::stream::StreamExt;
+
+use datafusion_physical_expr::{
+    normalize_out_expr_with_columns_map, project_equivalence_properties,
+    project_ordering_equivalence_properties, OrderingEquivalenceProperties,
+};
 
 /// Execution plan for a projection
 #[derive(Debug)]
@@ -95,8 +97,18 @@ impl ProjectionExec {
         let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
         for (expression, name) in expr.iter() {
             if let Some(column) = expression.as_any().downcast_ref::<Column>() 
{
+                // For some executors, logical and physical plan schema fields
+                // are not the same. The information in a `Column` comes from
+                // the logical plan schema. Therefore, to produce correct 
results
+                // we use the field in the input schema with the same index. 
This
+                // corresponds to the physical plan `Column`.
+                let idx = column.index();
+                let matching_input_field = input_schema.field(idx);
+                let matching_input_column = 
Column::new(matching_input_field.name(), idx);
                 let new_col_idx = schema.index_of(name)?;
-                let entry = 
columns_map.entry(column.clone()).or_insert_with(Vec::new);
+                let entry = columns_map
+                    .entry(matching_input_column)
+                    .or_insert_with(Vec::new);
                 entry.push(Column::new(name, new_col_idx));
             };
         }
@@ -204,6 +216,16 @@ impl ExecutionPlan for ProjectionExec {
         new_properties
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        let mut new_properties = 
OrderingEquivalenceProperties::new(self.schema());
+        project_ordering_equivalence_properties(
+            self.input.ordering_equivalence_properties(),
+            &self.columns_map,
+            &mut new_properties,
+        );
+        new_properties
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git 
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index ac84a6a395..95b482ef24 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::windows::{
-    calc_requirements, get_ordered_partition_by_indices,
+    calc_requirements, get_ordered_partition_by_indices, 
window_ordering_equivalence,
 };
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, 
Partitioning,
@@ -66,7 +66,8 @@ use datafusion_physical_expr::window::{
     WindowAggState, WindowState,
 };
 use datafusion_physical_expr::{
-    EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement,
+    EquivalenceProperties, OrderingEquivalenceProperties, PhysicalExpr,
+    PhysicalSortRequirement,
 };
 
 #[derive(Debug, Clone, PartialEq)]
@@ -259,6 +260,11 @@ impl ExecutionPlan for BoundedWindowAggExec {
         self.input().equivalence_properties()
     }
 
+    /// Get the OrderingEquivalenceProperties within the plan
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        window_ordering_equivalence(&self.schema, &self.input, 
&self.window_expr)
+    }
+
     fn maintains_input_order(&self) -> Vec<bool> {
         vec![true]
     }
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index 65d7729e18..40ef8a3e6a 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -29,6 +29,7 @@ use crate::physical_plan::{
 };
 use crate::scalar::ScalarValue;
 use arrow::datatypes::Schema;
+use arrow_schema::{SchemaRef, SortOptions};
 use datafusion_expr::{
     window_function::{signature_for_built_in, BuiltInWindowFunction, 
WindowFunction},
     WindowFrame,
@@ -46,11 +47,15 @@ mod window_agg_exec;
 pub use bounded_window_agg_exec::BoundedWindowAggExec;
 pub use bounded_window_agg_exec::PartitionSearchMode;
 use datafusion_common::utils::longest_consecutive_prefix;
+use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::utils::{convert_to_expr, 
get_indices_of_matching_exprs};
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
 };
-use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_expr::{
+    normalize_expr_with_equivalence_properties, OrderedColumn,
+    OrderingEquivalenceProperties, PhysicalSortRequirement,
+};
 pub use window_agg_exec::WindowAggExec;
 
 /// Create a physical expression for window function
@@ -242,6 +247,63 @@ pub(crate) fn get_ordered_partition_by_indices(
     input_places[0..first_n].to_vec()
 }
 
+pub(crate) fn window_ordering_equivalence(
+    schema: &SchemaRef,
+    input: &Arc<dyn ExecutionPlan>,
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> OrderingEquivalenceProperties {
+    // We need to update the schema, so we can not directly use
+    // `input.ordering_equivalence_properties()`.
+    let mut result = OrderingEquivalenceProperties::new(schema.clone());
+    result.extend(
+        input
+            .ordering_equivalence_properties()
+            .classes()
+            .iter()
+            .cloned(),
+    );
+    let out_ordering = input.output_ordering().unwrap_or(&[]);
+    for expr in window_expr {
+        if let Some(builtin_window_expr) =
+            expr.as_any().downcast_ref::<BuiltInWindowExpr>()
+        {
+            // Only the built-in `RowNumber` window function introduces a new
+            // ordering:
+            if builtin_window_expr
+                .get_built_in_func_expr()
+                .as_any()
+                .is::<RowNumber>()
+            {
+                // If there is an existing ordering, add new ordering as an 
equivalence:
+                if let Some(first) = out_ordering.first() {
+                    // Normalize expression, as we search for ordering 
equivalences
+                    // on normalized versions:
+                    let normalized = 
normalize_expr_with_equivalence_properties(
+                        first.expr.clone(),
+                        input.equivalence_properties().classes(),
+                    );
+                    if let Some(column) = 
normalized.as_any().downcast_ref::<Column>() {
+                        let column_info =
+                            
schema.column_with_name(expr.field().unwrap().name());
+                        if let Some((idx, field)) = column_info {
+                            let lhs = OrderedColumn::new(column.clone(), 
first.options);
+                            let options = SortOptions {
+                                descending: false,
+                                nulls_first: false,
+                            }; // ASC, NULLS LAST
+                            let rhs = OrderedColumn::new(
+                                Column::new(field.name(), idx),
+                                options,
+                            );
+                            result.add_equal_conditions((&lhs, &rhs));
+                        }
+                    }
+                }
+            }
+        }
+    }
+    result
+}
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 7ca95954ce..dc0302d77b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -25,7 +25,7 @@ use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::windows::{
-    calc_requirements, get_ordered_partition_by_indices,
+    calc_requirements, get_ordered_partition_by_indices, 
window_ordering_equivalence,
 };
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
@@ -42,7 +42,7 @@ use arrow::{
 };
 use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
 use datafusion_common::DataFusionError;
-use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_expr::{OrderingEquivalenceProperties, 
PhysicalSortRequirement};
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
 use std::any::Any;
@@ -191,6 +191,11 @@ impl ExecutionPlan for WindowAggExec {
         self.input().equivalence_properties()
     }
 
+    /// Get the OrderingEquivalenceProperties within the plan
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        window_ordering_equivalence(&self.schema, &self.input, 
&self.window_expr)
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 496d2b50db..5ba3644923 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2050,8 +2050,8 @@ SELECT
 15673 -1899175111
 
 # test_window_agg_partition_by_set
-# These tests checks for whether BoundedWindowAggExec and WindowAggExec treats 
partition by expressions as set.
-# Physical plan shouldn't have any SortExec in between Window Executors.
+# These tests check whether BoundedWindowAggExec and WindowAggExec treat 
PARTITION BY expressions as a set.
+# Physical plan shouldn't have any SortExec in between window executors.
 
 statement ok
 set datafusion.execution.target_partitions = 1;
@@ -2204,3 +2204,164 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 
PRECEDING AND 1 FOLLOWING)
 2.994840293343 NULL
 9.674390599321 NULL
 7.728066219895 NULL
+
+# test_c9_rn_ordering_alias
+# These tests check whether Datafusion is aware of the ordering generated by 
the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since 
the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 ASC NULLS LAST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 ASC NULLS LAST]
+        CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+28774375 1
+63044568 2
+141047417 3
+141680161 4
+145294611 5
+
+# test_c9_rn_ordering_alias_opposite_direction
+# These tests check whether Datafusion is aware of the ordering generated by 
the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since 
the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 DESC]
+        CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
+
+# test_c9_rn_ordering_alias_opposite_direction2
+# These tests check whether Datafusion is aware of the ordering generated by 
the ROW_NUMBER() window function.
+# Physical plan _should_ have a SortExec after BoundedWindowAggExec since the 
table after BoundedWindowAggExec is ordered by rn1 ASC and c9 DESC, which is 
conflicting with the requirement rn1 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1 DESC
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 DESC NULLS FIRST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[rn1@1 DESC]
+    ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
+      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+        SortExec: expr=[c9@0 DESC]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+               ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+               FROM aggregate_test_100
+               ORDER BY c9 DESC)
+   ORDER BY rn1 DESC
+   LIMIT 5
+----
+28774375 100
+63044568 99
+141047417 98
+141680161 97
+145294611 96
+
+# test_c9_rn_ordering_alias_opposite_direction3
+# These test check for whether datafusion is aware of the ordering of the 
column generated by ROW_NUMBER() window function.
+# Physical plan should have a SortExec after BoundedWindowAggExec.
+# The reason is that ordering of the table after BoundedWindowAggExec can be 
described as rn1 ASC, and also c9 DESC.
+# However, the requirement is rn1 ASC, c9 ASC (lexicographical order). Hence 
existing ordering cannot satisfy requirement
+# (Requirement is finer than existing ordering)
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                       ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY rn1, c9 ASC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
+      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+        SortExec: expr=[c9@0 DESC]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1 ASC, c9 DESC
+   LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index 4651c4054b..d46f56e46a 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -18,18 +18,19 @@
 use crate::expressions::Column;
 
 use arrow::datatypes::SchemaRef;
+use arrow_schema::SortOptions;
 
-use std::collections::HashMap;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
+use std::hash::Hash;
 
 /// Equivalence Properties is a vec of EquivalentClass.
 #[derive(Debug, Clone)]
-pub struct EquivalenceProperties {
-    classes: Vec<EquivalentClass>,
+pub struct EquivalenceProperties<T = Column> {
+    classes: Vec<EquivalentClass<T>>,
     schema: SchemaRef,
 }
 
-impl EquivalenceProperties {
+impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
     pub fn new(schema: SchemaRef) -> Self {
         EquivalenceProperties {
             classes: vec![],
@@ -37,7 +38,7 @@ impl EquivalenceProperties {
         }
     }
 
-    pub fn classes(&self) -> &[EquivalentClass] {
+    pub fn classes(&self) -> &[EquivalentClass<T>] {
         &self.classes
     }
 
@@ -45,26 +46,15 @@ impl EquivalenceProperties {
         self.schema.clone()
     }
 
-    pub fn extend<I: IntoIterator<Item = EquivalentClass>>(&mut self, iter: I) 
{
+    pub fn extend<I: IntoIterator<Item = EquivalentClass<T>>>(&mut self, iter: 
I) {
         for ec in iter {
-            for column in ec.iter() {
-                assert_eq!(column.name(), 
self.schema.fields()[column.index()].name());
-            }
             self.classes.push(ec)
         }
     }
 
     /// Add new equal conditions into the EquivalenceProperties, the new equal 
conditions are usually comming from the
     /// equality predicates in Join or Filter
-    pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) 
{
-        assert_eq!(
-            new_conditions.0.name(),
-            self.schema.fields()[new_conditions.0.index()].name()
-        );
-        assert_eq!(
-            new_conditions.1.name(),
-            self.schema.fields()[new_conditions.1.index()].name()
-        );
+    pub fn add_equal_conditions(&mut self, new_conditions: (&T, &T)) {
         let mut idx1: Option<usize> = None;
         let mut idx2: Option<usize> = None;
         for (idx, class) in self.classes.iter_mut().enumerate() {
@@ -102,7 +92,7 @@ impl EquivalenceProperties {
             }
             (None, None) => {
                 // adding new pairs
-                self.classes.push(EquivalentClass::new(
+                self.classes.push(EquivalentClass::<T>::new(
                     new_conditions.0.clone(),
                     vec![new_conditions.1.clone()],
                 ));
@@ -112,41 +102,61 @@ impl EquivalenceProperties {
     }
 }
 
-/// Equivalent Class is a set of Columns that are known to have the same value 
in all tuples in a relation
-/// Equivalent Class is generated by equality predicates, typically equijoin 
conditions and equality conditions in filters.
+/// `OrderingEquivalenceProperties` keeps track of columns that describe the
+/// global ordering of the schema. These columns are not necessarily same; e.g.
+/// ```text
+/// ┌-------┐
+/// | a | b |
+/// |---|---|
+/// | 1 | 9 |
+/// | 2 | 8 |
+/// | 3 | 7 |
+/// | 5 | 5 |
+/// └---┴---┘
+/// ```
+/// where both `a ASC` and `b DESC` can describe the table ordering. With
+/// `OrderingEquivalenceProperties`, we can keep track of these equivalences
+/// and treat `a ASC` and `b DESC` as the same ordering requirement.
+pub type OrderingEquivalenceProperties = EquivalenceProperties<OrderedColumn>;
+
+/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are 
known
+/// to have the same value in all tuples in a relation. 
`EquivalentClass<Column>`
+/// is generated by equality predicates, typically equijoin conditions and 
equality
+/// conditions in filters. `EquivalentClass<OrderedColumn>` is generated by the
+/// `ROW_NUMBER` window function.
 #[derive(Debug, Clone)]
-pub struct EquivalentClass {
+pub struct EquivalentClass<T = Column> {
     /// First element in the EquivalentClass
-    head: Column,
+    head: T,
     /// Other equal columns
-    others: HashSet<Column>,
+    others: HashSet<T>,
 }
 
-impl EquivalentClass {
-    pub fn new(head: Column, others: Vec<Column>) -> Self {
+impl<T: Eq + Hash + Clone> EquivalentClass<T> {
+    pub fn new(head: T, others: Vec<T>) -> EquivalentClass<T> {
         EquivalentClass {
             head,
             others: HashSet::from_iter(others),
         }
     }
 
-    pub fn head(&self) -> &Column {
+    pub fn head(&self) -> &T {
         &self.head
     }
 
-    pub fn others(&self) -> &HashSet<Column> {
+    pub fn others(&self) -> &HashSet<T> {
         &self.others
     }
 
-    pub fn contains(&self, col: &Column) -> bool {
+    pub fn contains(&self, col: &T) -> bool {
         self.head == *col || self.others.contains(col)
     }
 
-    pub fn insert(&mut self, col: Column) -> bool {
+    pub fn insert(&mut self, col: T) -> bool {
         self.others.insert(col)
     }
 
-    pub fn remove(&mut self, col: &Column) -> bool {
+    pub fn remove(&mut self, col: &T) -> bool {
         let removed = self.others.remove(col);
         if !removed && *col == self.head {
             let one_col = self.others.iter().next().cloned();
@@ -162,7 +172,7 @@ impl EquivalentClass {
         }
     }
 
-    pub fn iter(&self) -> impl Iterator<Item = &'_ Column> {
+    pub fn iter(&self) -> impl Iterator<Item = &'_ T> {
         std::iter::once(&self.head).chain(self.others.iter())
     }
 
@@ -175,10 +185,58 @@ impl EquivalentClass {
     }
 }
 
-/// Project Equivalence Properties.
-/// 1) Add Alias, Alias can introduce additional equivalence properties,
-///    For example:  Projection(a, a as a1, a as a2)
-/// 2) Truncate the EquivalentClasses that are not in the output schema
+/// This object represents a [`Column`] with a definite ordering.
+#[derive(Debug, Hash, PartialEq, Eq, Clone)]
+pub struct OrderedColumn {
+    pub col: Column,
+    pub options: SortOptions,
+}
+
+impl OrderedColumn {
+    pub fn new(col: Column, options: SortOptions) -> Self {
+        Self { col, options }
+    }
+}
+
+trait ColumnAccessor {
+    fn column(&self) -> &Column;
+}
+
+impl ColumnAccessor for Column {
+    fn column(&self) -> &Column {
+        self
+    }
+}
+
+impl ColumnAccessor for OrderedColumn {
+    fn column(&self) -> &Column {
+        &self.col
+    }
+}
+
+pub type OrderingEquivalentClass = EquivalentClass<OrderedColumn>;
+
+impl OrderingEquivalentClass {
+    /// Finds the matching column inside the `OrderingEquivalentClass`.
+    fn get_matching_column(&self, column: &Column) -> Option<OrderedColumn> {
+        if self.head.col.eq(column) {
+            Some(self.head.clone())
+        } else {
+            for item in &self.others {
+                if item.col.eq(column) {
+                    return Some(item.clone());
+                }
+            }
+            None
+        }
+    }
+}
+
+/// This function applies the given projection to the given equivalence
+/// properties to compute the resulting (projected) equivalence properties; 
e.g.
+/// 1) Adding an alias, which can introduce additional equivalence properties,
+///    as in Projection(a, a as a1, a as a2).
+/// 2) Truncate the [`EquivalentClass`]es that are not in the output schema.
 pub fn project_equivalence_properties(
     input_eq: EquivalenceProperties,
     alias_map: &HashMap<Column, Vec<Column>>,
@@ -201,22 +259,63 @@ pub fn project_equivalence_properties(
         }
     }
 
-    let schema = output_eq.schema();
-    for class in ec_classes.iter_mut() {
-        let mut columns_to_remove = vec![];
-        for column in class.iter() {
-            if column.index() >= schema.fields().len()
-                || schema.fields()[column.index()].name() != column.name()
+    prune_columns_to_remove(output_eq, &mut ec_classes);
+    output_eq.extend(ec_classes);
+}
+
+/// This function applies the given projection to the given ordering
+/// equivalence properties to compute the resulting (projected) ordering
+/// equivalence properties; e.g.
+/// 1) Adding an alias, which can introduce additional ordering equivalence
+///    properties, as in Projection(a, a as a1, a as a2) extends global 
ordering
+///    of a to a1 and a2.
+/// 2) Truncate the [`OrderingEquivalentClass`]es that are not in the output 
schema.
+pub fn project_ordering_equivalence_properties(
+    input_eq: OrderingEquivalenceProperties,
+    columns_map: &HashMap<Column, Vec<Column>>,
+    output_eq: &mut OrderingEquivalenceProperties,
+) {
+    let mut ec_classes = input_eq.classes().to_vec();
+    for (column, columns) in columns_map {
+        for class in ec_classes.iter_mut() {
+            if let Some(OrderedColumn { options, .. }) = 
class.get_matching_column(column)
             {
-                columns_to_remove.push(column.clone());
+                for col in columns {
+                    class.insert(OrderedColumn {
+                        col: col.clone(),
+                        options,
+                    });
+                }
+                break;
             }
         }
+    }
+
+    prune_columns_to_remove(output_eq, &mut ec_classes);
+    output_eq.extend(ec_classes);
+}
+
+fn prune_columns_to_remove<T: Eq + Hash + Clone + ColumnAccessor>(
+    eq_properties: &EquivalenceProperties<T>,
+    eq_classes: &mut Vec<EquivalentClass<T>>,
+) {
+    let schema = eq_properties.schema();
+    let fields = schema.fields();
+    for class in eq_classes.iter_mut() {
+        let columns_to_remove = class
+            .iter()
+            .filter(|elem| {
+                let column = elem.column();
+                let idx = column.index();
+                idx >= fields.len() || fields[idx].name() != column.name()
+            })
+            .cloned()
+            .collect::<Vec<_>>();
         for column in columns_to_remove {
             class.remove(&column);
         }
     }
-    ec_classes.retain(|props| props.len() > 1);
-    output_eq.extend(ec_classes);
+    eq_classes.retain(|props| props.len() > 1);
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 5cbd40cc8c..b65a4e892f 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -48,8 +48,11 @@ pub mod window;
 // reexport this to maintain compatibility with anything that used from_slice 
previously
 pub use aggregate::AggregateExpr;
 pub use datafusion_common::from_slice;
-pub use equivalence::EquivalenceProperties;
-pub use equivalence::EquivalentClass;
+pub use equivalence::{
+    project_equivalence_properties, project_ordering_equivalence_properties,
+    EquivalenceProperties, EquivalentClass, OrderedColumn, 
OrderingEquivalenceProperties,
+    OrderingEquivalentClass,
+};
 pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, 
PhysicalExprRef};
 pub use planner::create_physical_expr;
 pub use scalar_function::ScalarFunctionExpr;
@@ -57,6 +60,5 @@ pub use sort_expr::{PhysicalSortExpr, 
PhysicalSortRequirement};
 pub use utils::{
     expr_list_eq_any_order, expr_list_eq_strict_order,
     normalize_expr_with_equivalence_properties, 
normalize_out_expr_with_columns_map,
-    normalize_sort_expr_with_equivalence_properties, 
sort_expr_list_eq_strict_order,
-    split_conjunction,
+    sort_expr_list_eq_strict_order, split_conjunction,
 };
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index 70297bce78..c842f8521d 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,18 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::equivalence::EquivalentClass;
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::{
-    EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement,
+use crate::equivalence::{
+    EquivalenceProperties, EquivalentClass, OrderedColumn, 
OrderingEquivalenceProperties,
+    OrderingEquivalentClass,
 };
-use arrow::datatypes::SchemaRef;
-use datafusion_common::Result;
-use datafusion_expr::Operator;
+use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
+use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
 
+use arrow::datatypes::SchemaRef;
+use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
 };
+use datafusion_common::Result;
+use datafusion_expr::Operator;
+
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
 use std::borrow::Borrow;
@@ -164,7 +167,7 @@ pub fn normalize_expr_with_equivalence_properties(
         .unwrap_or(expr)
 }
 
-pub fn normalize_sort_expr_with_equivalence_properties(
+fn normalize_sort_expr_with_equivalence_properties(
     sort_expr: PhysicalSortExpr,
     eq_properties: &[EquivalentClass],
 ) -> PhysicalSortExpr {
@@ -181,7 +184,7 @@ pub fn normalize_sort_expr_with_equivalence_properties(
     }
 }
 
-pub fn normalize_sort_requirement_with_equivalence_properties(
+fn normalize_sort_requirement_with_equivalence_properties(
     sort_requirement: PhysicalSortRequirement,
     eq_properties: &[EquivalentClass],
 ) -> PhysicalSortRequirement {
@@ -196,27 +199,137 @@ pub fn 
normalize_sort_requirement_with_equivalence_properties(
     }
 }
 
+fn normalize_expr_with_ordering_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    sort_options: Option<SortOptions>,
+    eq_properties: &[OrderingEquivalentClass],
+) -> Arc<dyn PhysicalExpr> {
+    expr.clone()
+        .transform(&|expr| {
+            let normalized_form =
+                expr.as_any().downcast_ref::<Column>().and_then(|column| {
+                    if let Some(options) = sort_options {
+                        for class in eq_properties {
+                            let ordered_column = OrderedColumn {
+                                col: column.clone(),
+                                options,
+                            };
+                            if class.contains(&ordered_column) {
+                                return Some(class.head().clone());
+                            }
+                        }
+                    }
+                    None
+                });
+            Ok(if let Some(normalized_form) = normalized_form {
+                Transformed::Yes(Arc::new(normalized_form.col) as _)
+            } else {
+                Transformed::No(expr)
+            })
+        })
+        .unwrap_or(expr)
+}
+
+fn normalize_sort_expr_with_ordering_equivalence_properties(
+    sort_expr: PhysicalSortExpr,
+    eq_properties: &[OrderingEquivalentClass],
+) -> PhysicalSortExpr {
+    let requirement = 
normalize_sort_requirement_with_ordering_equivalence_properties(
+        PhysicalSortRequirement::from(sort_expr),
+        eq_properties,
+    );
+    requirement.into_sort_expr()
+}
+
+fn normalize_sort_requirement_with_ordering_equivalence_properties(
+    sort_requirement: PhysicalSortRequirement,
+    eq_properties: &[OrderingEquivalentClass],
+) -> PhysicalSortRequirement {
+    let normalized_expr = normalize_expr_with_ordering_equivalence_properties(
+        sort_requirement.expr().clone(),
+        sort_requirement.options(),
+        eq_properties,
+    );
+    if sort_requirement.expr().eq(&normalized_expr) {
+        sort_requirement
+    } else {
+        let mut options = sort_requirement.options();
+        if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
+            for eq_class in eq_properties.iter() {
+                let head = eq_class.head();
+                if head.col.eq(col) {
+                    // If there is a requirement, update it with the 
requirement of its normalized version.
+                    if let Some(options) = &mut options {
+                        *options = head.options;
+                    }
+                    break;
+                }
+            }
+        }
+        PhysicalSortRequirement::new(normalized_expr, options)
+    }
+}
+
+pub fn normalize_sort_expr(
+    sort_expr: PhysicalSortExpr,
+    eq_properties: &[EquivalentClass],
+    ordering_eq_properties: &[OrderingEquivalentClass],
+) -> PhysicalSortExpr {
+    let normalized =
+        normalize_sort_expr_with_equivalence_properties(sort_expr, 
eq_properties);
+    normalize_sort_expr_with_ordering_equivalence_properties(
+        normalized,
+        ordering_eq_properties,
+    )
+}
+
+pub fn normalize_sort_requirement(
+    sort_requirement: PhysicalSortRequirement,
+    eq_classes: &[EquivalentClass],
+    ordering_eq_classes: &[OrderingEquivalentClass],
+) -> PhysicalSortRequirement {
+    let normalized = normalize_sort_requirement_with_equivalence_properties(
+        sort_requirement,
+        eq_classes,
+    );
+    normalize_sort_requirement_with_ordering_equivalence_properties(
+        normalized,
+        ordering_eq_classes,
+    )
+}
+
 /// Checks whether given ordering requirements are satisfied by provided 
[PhysicalSortExpr]s.
-pub fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
+pub fn ordering_satisfy<
+    F: FnOnce() -> EquivalenceProperties,
+    F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
     provided: Option<&[PhysicalSortExpr]>,
     required: Option<&[PhysicalSortExpr]>,
     equal_properties: F,
+    ordering_equal_properties: F2,
 ) -> bool {
     match (provided, required) {
         (_, None) => true,
         (None, Some(_)) => false,
-        (Some(provided), Some(required)) => {
-            ordering_satisfy_concrete(provided, required, equal_properties)
-        }
+        (Some(provided), Some(required)) => ordering_satisfy_concrete(
+            provided,
+            required,
+            equal_properties,
+            ordering_equal_properties,
+        ),
     }
 }
 
 /// Checks whether the required [`PhysicalSortExpr`]s are satisfied by the
 /// provided [`PhysicalSortExpr`]s.
-fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
+fn ordering_satisfy_concrete<
+    F: FnOnce() -> EquivalenceProperties,
+    F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
     provided: &[PhysicalSortExpr],
     required: &[PhysicalSortExpr],
     equal_properties: F,
+    ordering_equal_properties: F2,
 ) -> bool {
     if required.len() > provided.len() {
         false
@@ -226,43 +339,56 @@ fn ordering_satisfy_concrete<F: FnOnce() -> 
EquivalenceProperties>(
         .all(|(req, given)| req.eq(given))
     {
         true
-    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+    } else {
+        let oeq_properties = ordering_equal_properties();
+        let ordering_eq_classes = oeq_properties.classes();
+        let eq_properties = equal_properties();
+        let eq_classes = eq_properties.classes();
         required
             .iter()
-            .map(|e| {
-                normalize_sort_expr_with_equivalence_properties(e.clone(), 
eq_classes)
-            })
-            .zip(provided.iter().map(|e| {
-                normalize_sort_expr_with_equivalence_properties(e.clone(), 
eq_classes)
-            }))
-            .all(|(req, given)| req.eq(&given))
-    } else {
-        false
+            .map(|e| normalize_sort_expr(e.clone(), eq_classes, 
ordering_eq_classes))
+            .zip(
+                provided.iter().map(|e| {
+                    normalize_sort_expr(e.clone(), eq_classes, 
ordering_eq_classes)
+                }),
+            )
+            .all(|(req, given)| req == given)
     }
 }
 
 /// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
 /// provided [`PhysicalSortExpr`]s.
-pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+pub fn ordering_satisfy_requirement<
+    F: FnOnce() -> EquivalenceProperties,
+    F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
     provided: Option<&[PhysicalSortExpr]>,
     required: Option<&[PhysicalSortRequirement]>,
     equal_properties: F,
+    ordering_equal_properties: F2,
 ) -> bool {
     match (provided, required) {
         (_, None) => true,
         (None, Some(_)) => false,
-        (Some(provided), Some(required)) => {
-            ordering_satisfy_requirement_concrete(provided, required, 
equal_properties)
-        }
+        (Some(provided), Some(required)) => 
ordering_satisfy_requirement_concrete(
+            provided,
+            required,
+            equal_properties,
+            ordering_equal_properties,
+        ),
     }
 }
 
 /// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
 /// provided [`PhysicalSortExpr`]s.
-pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> 
EquivalenceProperties>(
+pub fn ordering_satisfy_requirement_concrete<
+    F: FnOnce() -> EquivalenceProperties,
+    F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
     provided: &[PhysicalSortExpr],
     required: &[PhysicalSortRequirement],
     equal_properties: F,
+    ordering_equal_properties: F2,
 ) -> bool {
     if required.len() > provided.len() {
         false
@@ -272,46 +398,58 @@ pub fn ordering_satisfy_requirement_concrete<F: FnOnce() 
-> EquivalencePropertie
         .all(|(req, given)| given.satisfy(req))
     {
         true
-    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+    } else {
+        let oeq_properties = ordering_equal_properties();
+        let ordering_eq_classes = oeq_properties.classes();
+        let eq_properties = equal_properties();
+        let eq_classes = eq_properties.classes();
         required
             .iter()
             .map(|e| {
-                normalize_sort_requirement_with_equivalence_properties(
-                    e.clone(),
-                    eq_classes,
-                )
+                normalize_sort_requirement(e.clone(), eq_classes, 
ordering_eq_classes)
             })
-            .zip(provided.iter().map(|e| {
-                normalize_sort_expr_with_equivalence_properties(e.clone(), 
eq_classes)
-            }))
+            .zip(
+                provided.iter().map(|e| {
+                    normalize_sort_expr(e.clone(), eq_classes, 
ordering_eq_classes)
+                }),
+            )
             .all(|(req, given)| given.satisfy(&req))
-    } else {
-        false
     }
 }
 
 /// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
 /// specific than the provided [`PhysicalSortRequirement`]s.
-pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+pub fn requirements_compatible<
+    F: FnOnce() -> OrderingEquivalenceProperties,
+    F2: FnOnce() -> EquivalenceProperties,
+>(
     provided: Option<&[PhysicalSortRequirement]>,
     required: Option<&[PhysicalSortRequirement]>,
-    equal_properties: F,
+    ordering_equal_properties: F,
+    equal_properties: F2,
 ) -> bool {
     match (provided, required) {
         (_, None) => true,
         (None, Some(_)) => false,
-        (Some(provided), Some(required)) => {
-            requirements_compatible_concrete(provided, required, 
equal_properties)
-        }
+        (Some(provided), Some(required)) => requirements_compatible_concrete(
+            provided,
+            required,
+            ordering_equal_properties,
+            equal_properties,
+        ),
     }
 }
 
 /// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
 /// specific than the provided [`PhysicalSortRequirement`]s.
-fn requirements_compatible_concrete<F: FnOnce() -> EquivalenceProperties>(
+fn requirements_compatible_concrete<
+    F: FnOnce() -> OrderingEquivalenceProperties,
+    F2: FnOnce() -> EquivalenceProperties,
+>(
     provided: &[PhysicalSortRequirement],
     required: &[PhysicalSortRequirement],
-    equal_properties: F,
+    ordering_equal_properties: F,
+    equal_properties: F2,
 ) -> bool {
     if required.len() > provided.len() {
         false
@@ -321,24 +459,20 @@ fn requirements_compatible_concrete<F: FnOnce() -> 
EquivalenceProperties>(
         .all(|(req, given)| given.compatible(req))
     {
         true
-    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+    } else {
+        let oeq_properties = ordering_equal_properties();
+        let ordering_eq_classes = oeq_properties.classes();
+        let eq_properties = equal_properties();
+        let eq_classes = eq_properties.classes();
         required
             .iter()
             .map(|e| {
-                normalize_sort_requirement_with_equivalence_properties(
-                    e.clone(),
-                    eq_classes,
-                )
+                normalize_sort_requirement(e.clone(), eq_classes, 
ordering_eq_classes)
             })
             .zip(provided.iter().map(|e| {
-                normalize_sort_requirement_with_equivalence_properties(
-                    e.clone(),
-                    eq_classes,
-                )
+                normalize_sort_requirement(e.clone(), eq_classes, 
ordering_eq_classes)
             }))
             .all(|(req, given)| given.compatible(&req))
-    } else {
-        false
     }
 }
 
@@ -643,6 +777,55 @@ mod tests {
         }
     }
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    fn create_test_params() -> Result<(
+        SchemaRef,
+        EquivalenceProperties,
+        OrderingEquivalenceProperties,
+    )> {
+        // Assume schema satisfies ordering a ASC NULLS LAST
+        // and d ASC NULLS LAST and e DESC NULLS FIRST
+        // Assume that column a and c are aliases.
+        let col_a = &Column::new("a", 0);
+        let _col_b = &Column::new("b", 1);
+        let col_c = &Column::new("c", 2);
+        let col_d = &Column::new("d", 3);
+        let col_e = &Column::new("e", 4);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let option2 = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        let test_schema = create_test_schema()?;
+        let mut eq_properties = 
EquivalenceProperties::new(test_schema.clone());
+        eq_properties.add_equal_conditions((col_a, col_c));
+        let mut ordering_eq_properties =
+            OrderingEquivalenceProperties::new(test_schema.clone());
+        ordering_eq_properties.add_equal_conditions((
+            &OrderedColumn::new(col_a.clone(), option1),
+            &OrderedColumn::new(col_d.clone(), option1),
+        ));
+        ordering_eq_properties.add_equal_conditions((
+            &OrderedColumn::new(col_a.clone(), option1),
+            &OrderedColumn::new(col_e.clone(), option2),
+        ));
+        Ok((test_schema, eq_properties, ordering_eq_properties))
+    }
+
     #[test]
     fn test_build_dag() -> Result<()> {
         let schema = Schema::new(vec![
@@ -911,12 +1094,86 @@ mod tests {
         ];
         let finer = Some(&finer[..]);
         let empty_schema = &Arc::new(Schema::empty());
-        assert!(ordering_satisfy(finer, crude, || {
-            EquivalenceProperties::new(empty_schema.clone())
-        }));
-        assert!(!ordering_satisfy(crude, finer, || {
-            EquivalenceProperties::new(empty_schema.clone())
-        }));
+        assert!(ordering_satisfy(
+            finer,
+            crude,
+            || { EquivalenceProperties::new(empty_schema.clone()) },
+            || { OrderingEquivalenceProperties::new(empty_schema.clone()) },
+        ));
+        assert!(!ordering_satisfy(
+            crude,
+            finer,
+            || { EquivalenceProperties::new(empty_schema.clone()) },
+            || { OrderingEquivalenceProperties::new(empty_schema.clone()) },
+        ));
+        Ok(())
+    }
+
+    #[test]
+    fn test_ordering_satisfy_with_equivalence() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let col_b = &Column::new("b", 1);
+        let col_c = &Column::new("c", 2);
+        let col_d = &Column::new("d", 3);
+        let col_e = &Column::new("e", 4);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let option2 = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        // The schema is ordered by a ASC NULLS LAST, b ASC NULLS LAST
+        let provided = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(col_a.clone()),
+                options: option1,
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(col_b.clone()),
+                options: option1,
+            },
+        ];
+        let provided = Some(&provided[..]);
+        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
+        // First element in the tuple stores vector of requirement, second 
element is the expected return value for ordering_satisfy function
+        let requirements = vec![
+            // `a ASC NULLS LAST`, expects `ordering_satisfy` to be `true`, 
since existing ordering `a ASC NULLS LAST, b ASC NULLS LAST` satisfies it
+            (vec![(col_a, option1)], true),
+            (vec![(col_a, option2)], false),
+            // Test whether equivalence works as expected
+            (vec![(col_c, option1)], true),
+            (vec![(col_c, option2)], false),
+            // Test whether ordering equivalence works as expected
+            (vec![(col_d, option1)], true),
+            (vec![(col_d, option2)], false),
+            (vec![(col_e, option2)], true),
+            (vec![(col_e, option1)], false),
+        ];
+        for (cols, expected) in requirements {
+            let err_msg = format!("Error in test case:{:?}", cols);
+            let required = cols
+                .into_iter()
+                .map(|(col, options)| PhysicalSortExpr {
+                    expr: Arc::new(col.clone()),
+                    options,
+                })
+                .collect::<Vec<_>>();
+
+            let required = Some(&required[..]);
+            assert_eq!(
+                ordering_satisfy(
+                    provided,
+                    required,
+                    || eq_properties.clone(),
+                    || ordering_eq_properties.clone(),
+                ),
+                expected,
+                "{}",
+                err_msg
+            );
+        }
         Ok(())
     }
 
@@ -956,4 +1213,230 @@ mod tests {
 
         assert_eq!(actual.as_ref(), expected.as_any());
     }
+
+    #[test]
+    fn test_normalize_expr_with_equivalence() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let _col_b = &Column::new("b", 1);
+        let col_c = &Column::new("c", 2);
+        let col_d = &Column::new("d", 3);
+        let col_e = &Column::new("e", 4);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let option2 = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        // Assume schema satisfies ordering a ASC NULLS LAST
+        // and d ASC NULLS LAST and e DESC NULLS FIRST
+        // Assume that column a and c are aliases.
+        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
+
+        let col_a_expr = Arc::new(col_a.clone()) as Arc<dyn PhysicalExpr>;
+        let col_c_expr = Arc::new(col_c.clone()) as Arc<dyn PhysicalExpr>;
+        let col_d_expr = Arc::new(col_d.clone()) as Arc<dyn PhysicalExpr>;
+        let col_e_expr = Arc::new(col_e.clone()) as Arc<dyn PhysicalExpr>;
+        // Test cases for equivalence normalization,
+        // First entry in the tuple is argument, second entry is expected 
result after normalization.
+        let expressions = vec![(&col_a_expr, &col_a_expr), (&col_c_expr, 
&col_a_expr)];
+        for (expr, expected_eq) in expressions {
+            assert!(
+                expected_eq.eq(&normalize_expr_with_equivalence_properties(
+                    expr.clone(),
+                    eq_properties.classes()
+                )),
+                "error in test: expr: {:?}",
+                expr
+            );
+        }
+
+        // Test cases for ordering equivalence normalization
+        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
+        let expressions = vec![
+            (&col_d_expr, option1, &col_a_expr),
+            (&col_e_expr, option2, &col_a_expr),
+            // Cannot normalize, hence should return itself.
+            (&col_e_expr, option1, &col_e_expr),
+        ];
+        for (expr, sort_options, expected_ordering_eq) in expressions {
+            assert!(
+                expected_ordering_eq.eq(
+                    &normalize_expr_with_ordering_equivalence_properties(
+                        expr.clone(),
+                        Some(sort_options),
+                        ordering_eq_properties.classes()
+                    )
+                ),
+                "error in test: expr: {:?}, sort_options: {:?}",
+                expr,
+                sort_options
+            );
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_normalize_sort_expr_with_equivalence() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let _col_b = &Column::new("b", 1);
+        let col_c = &Column::new("c", 2);
+        let col_d = &Column::new("d", 3);
+        let col_e = &Column::new("e", 4);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let option2 = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        // Assume schema satisfies ordering a ASC NULLS LAST
+        // and d ASC NULLS LAST and e DESC NULLS FIRST
+        // Assume that column a and c are aliases.
+        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
+
+        // Test cases for equivalence normalization
+        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
+        let expressions = vec![
+            (&col_a, option1, &col_a, option1),
+            (&col_c, option1, &col_a, option1),
+            // Cannot normalize column d, since it is not in equivalence 
properties.
+            (&col_d, option1, &col_d, option1),
+        ];
+        for (expr, sort_options, expected_col, expected_options) in
+            expressions.into_iter()
+        {
+            let expected = PhysicalSortExpr {
+                expr: Arc::new((*expected_col).clone()) as _,
+                options: expected_options,
+            };
+            let arg = PhysicalSortExpr {
+                expr: Arc::new((*expr).clone()) as _,
+                options: sort_options,
+            };
+            assert!(
+                expected.eq(&normalize_sort_expr_with_equivalence_properties(
+                    arg.clone(),
+                    eq_properties.classes()
+                )),
+                "error in test: expr: {:?}, sort_options: {:?}",
+                expr,
+                sort_options
+            );
+        }
+
+        // Test cases for ordering equivalence normalization
+        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
+        let expressions = vec![
+            (&col_d, option1, &col_a, option1),
+            (&col_e, option2, &col_a, option1),
+        ];
+        for (expr, sort_options, expected_col, expected_options) in
+            expressions.into_iter()
+        {
+            let expected = PhysicalSortExpr {
+                expr: Arc::new((*expected_col).clone()) as _,
+                options: expected_options,
+            };
+            let arg = PhysicalSortExpr {
+                expr: Arc::new((*expr).clone()) as _,
+                options: sort_options,
+            };
+            assert!(
+                
expected.eq(&normalize_sort_expr_with_ordering_equivalence_properties(
+                    arg.clone(),
+                    ordering_eq_properties.classes()
+                )),
+                "error in test: expr: {:?}, sort_options: {:?}",
+                expr,
+                sort_options
+            );
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_normalize_sort_requirement_with_equivalence() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let _col_b = &Column::new("b", 1);
+        let col_c = &Column::new("c", 2);
+        let col_d = &Column::new("d", 3);
+        let col_e = &Column::new("e", 4);
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let option2 = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+        // Assume schema satisfies ordering a ASC NULLS LAST
+        // and d ASC NULLS LAST and e DESC NULLS FIRST
+        // Assume that column a and c are aliases.
+        let (_test_schema, eq_properties, ordering_eq_properties) = 
create_test_params()?;
+
+        // Test cases for equivalence normalization
+        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
+        let expressions = vec![
+            (&col_a, Some(option1), &col_a, Some(option1)),
+            (&col_c, Some(option1), &col_a, Some(option1)),
+            (&col_c, None, &col_a, None),
+            // Cannot normalize column d, since it is not in equivalence 
properties.
+            (&col_d, Some(option1), &col_d, Some(option1)),
+        ];
+        for (expr, sort_options, expected_col, expected_options) in
+            expressions.into_iter()
+        {
+            let expected = PhysicalSortRequirement::new(
+                Arc::new((*expected_col).clone()) as _,
+                expected_options,
+            );
+            let arg = PhysicalSortRequirement::new(
+                Arc::new((*expr).clone()) as _,
+                sort_options,
+            );
+            assert!(
+                
expected.eq(&normalize_sort_requirement_with_equivalence_properties(
+                    arg.clone(),
+                    eq_properties.classes()
+                )),
+                "error in test: expr: {:?}, sort_options: {:?}",
+                expr,
+                sort_options
+            );
+        }
+
+        // Test cases for ordering equivalence normalization
+        // First entry in the tuple is PhysicalExpr, second entry is its 
ordering, third entry is result after normalization.
+        let expressions = vec![
+            (&col_d, Some(option1), &col_a, Some(option1)),
+            (&col_e, Some(option2), &col_a, Some(option1)),
+        ];
+        for (expr, sort_options, expected_col, expected_options) in
+            expressions.into_iter()
+        {
+            let expected = PhysicalSortRequirement::new(
+                Arc::new((*expected_col).clone()) as _,
+                expected_options,
+            );
+            let arg = PhysicalSortRequirement::new(
+                Arc::new((*expr).clone()) as _,
+                sort_options,
+            );
+            assert!(
+                expected.eq(
+                    
&normalize_sort_requirement_with_ordering_equivalence_properties(
+                        arg.clone(),
+                        ordering_eq_properties.classes()
+                    )
+                ),
+                "error in test: expr: {:?}, sort_options: {:?}",
+                expr,
+                sort_options
+            );
+        }
+        Ok(())
+    }
 }

Reply via email to