This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fd704ca74 Projection Order Propagation (#7364)
2fd704ca74 is described below

commit 2fd704ca74be0504f4a051890af5cc1b7189de76
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Aug 24 01:01:07 2023 +0300

    Projection Order Propagation (#7364)
    
    * projection exec is updated, get_ordering method is added to physical 
expr's
    
    * fix after merge
    
    * simplifications
    
    * Refactor, normalization code
    
    * Simplifications
    
    * mustafa's simplifications
    
    * test source update
    
    * Comment edited
    
    * Simplifications
    
    * Code improvements, comment reviews
    
    * Comments are enriched
    
    * tests added, ExtendedSortOptions renamed
    
    * fix after merge
    
    * Minor change.
    
    * Address reviews
    
    * structural changes
    
    * Finalizing changes
    
    ---------
    
    Co-authored-by: Mustafa Akur <[email protected]>
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
    Co-authored-by: metesynnada <[email protected]>
---
 datafusion/core/src/physical_plan/joins/utils.rs   |  25 +-
 datafusion/core/src/physical_plan/projection.rs    | 102 ++++--
 datafusion/physical-expr/src/equivalence.rs        |  68 ++--
 datafusion/physical-expr/src/expressions/binary.rs |  50 +--
 datafusion/physical-expr/src/expressions/cast.rs   |   7 +
 .../physical-expr/src/expressions/literal.rs       |   5 +
 .../physical-expr/src/expressions/negative.rs      |  11 +-
 datafusion/physical-expr/src/lib.rs                |   6 +-
 datafusion/physical-expr/src/physical_expr.rs      |  14 +
 datafusion/physical-expr/src/sort_properties.rs    | 277 ++++++++++++++++
 datafusion/physical-expr/src/utils.rs              | 360 +++++++++++++++++++--
 datafusion/sqllogictest/test_files/order.slt       |  35 ++
 12 files changed, 837 insertions(+), 123 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/utils.rs 
b/datafusion/core/src/physical_plan/joins/utils.rs
index 4f7b0023f4..3842d7d7ea 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -222,7 +222,7 @@ pub fn calculate_join_output_ordering(
                 );
                 merge_vectors(&right_ordering, left_ordering)
             } else {
-                right_ordering
+                right_ordering.to_vec()
             }
         }
         // Doesn't maintain ordering, output ordering is None.
@@ -310,17 +310,20 @@ pub fn cross_join_equivalence_properties(
     new_properties
 }
 
-/// Update right table ordering equivalences so that: (1) They point to valid
-/// indices at the output of the join schema, and (2) they are normalized 
w.r.t.
-/// given equivalence properties. To do so, we first increment column indices 
by
-/// the left table size when join schema consists of a combination of left and
-/// right schemas (Inner, Left, Full, Right joins).
-/// Then, we normalize the sort expressions of ordering equivalences one by 
one.
-/// We make sure that each expression in the ordering equivalence is either:
-/// - Is the head of an equivalent classes, or
+/// Update right table ordering equivalences so that:
+/// - They point to valid indices at the output of the join schema, and
+/// - They are normalized with respect to equivalence columns.
+///
+/// To do so, we increment column indices by the size of the left table when
+/// join schema consists of a combination of left and right schema (Inner,
+/// Left, Full, Right joins). Then, we normalize the sort expressions of
+/// ordering equivalences one by one. We make sure that each expression in the
+/// ordering equivalence is either:
+/// - The head of the one of the equivalent classes, or
 /// - Doesn't have an equivalent column.
-/// This way, once we normalize an expression according to equivalence 
properties,
-/// then it can be safely used for ordering equivalence normalization.
+///
+/// This way; once we normalize an expression according to equivalence 
properties,
+/// it can thereafter safely be used for ordering equivalence normalization.
 fn get_updated_right_ordering_equivalence_properties(
     join_type: &JoinType,
     right_oeq_classes: &[OrderingEquivalentClass],
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index 86449e8ea4..f7f8b0f452 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -26,28 +26,28 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use super::expressions::{Column, PhysicalSortExpr};
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, 
Statistics};
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
     Partitioning, PhysicalExpr,
 };
+
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::{RecordBatch, RecordBatchOptions};
 use datafusion_common::Result;
 use datafusion_execution::TaskContext;
-use futures::stream::{Stream, StreamExt};
-use log::trace;
-
-use super::expressions::{Column, PhysicalSortExpr};
-use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
-use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, 
Statistics};
-
-use 
datafusion_physical_expr::equivalence::update_ordering_equivalence_with_cast;
-use datafusion_physical_expr::expressions::{CastExpr, Literal};
+use datafusion_physical_expr::expressions::{Literal, UnKnownColumn};
 use datafusion_physical_expr::{
-    normalize_out_expr_with_columns_map, project_equivalence_properties,
-    project_ordering_equivalence_properties, OrderingEquivalenceProperties,
+    find_orderings_of_exprs, normalize_out_expr_with_columns_map,
+    project_equivalence_properties, project_ordering_equivalence_properties,
+    OrderingEquivalenceProperties,
 };
 
+use futures::stream::{Stream, StreamExt};
+use log::trace;
+
 /// Execution plan for a projection
 #[derive(Debug)]
 pub struct ProjectionExec {
@@ -64,6 +64,10 @@ pub struct ProjectionExec {
     columns_map: HashMap<Column, Vec<Column>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Expressions' normalized orderings (as given by the output ordering API
+    /// and normalized with respect to equivalence classes of input plan). The
+    /// projected expressions are mapped by their indices to this vector.
+    orderings: Vec<Option<PhysicalSortExpr>>,
 }
 
 impl ProjectionExec {
@@ -136,13 +140,24 @@ impl ProjectionExec {
             None => None,
         };
 
+        let orderings = find_orderings_of_exprs(
+            &expr,
+            input.output_ordering(),
+            input.equivalence_properties(),
+            input.ordering_equivalence_properties(),
+        )?;
+
+        let output_ordering =
+            validate_output_ordering(output_ordering, &orderings, &expr);
+
         Ok(Self {
             expr,
             schema,
-            input: input.clone(),
+            input,
             output_ordering,
             columns_map,
             metrics: ExecutionPlanMetricsSet::new(),
+            orderings,
         })
     }
 
@@ -251,17 +266,7 @@ impl ExecutionPlan for ProjectionExec {
             return new_properties;
         }
 
-        let mut input_oeq = self.input().ordering_equivalence_properties();
-        // Stores cast expression and its `Column` version in the output:
-        let mut cast_exprs: Vec<(CastExpr, Column)> = vec![];
-        for (idx, (expr, name)) in self.expr.iter().enumerate() {
-            if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
-                let target_col = Column::new(name, idx);
-                cast_exprs.push((cast_expr.clone(), target_col));
-            }
-        }
-
-        update_ordering_equivalence_with_cast(&cast_exprs, &mut input_oeq);
+        let input_oeq = self.input().ordering_equivalence_properties();
 
         project_ordering_equivalence_properties(
             input_oeq,
@@ -269,6 +274,23 @@ impl ExecutionPlan for ProjectionExec {
             &mut new_properties,
         );
 
+        if let Some(leading_ordering) = self
+            .output_ordering
+            .as_ref()
+            .map(|output_ordering| &output_ordering[0])
+        {
+            for order in self.orderings.iter().flatten() {
+                if !order.eq(leading_ordering)
+                    && !new_properties.satisfies_leading_ordering(order)
+                {
+                    new_properties.add_equal_conditions((
+                        &vec![leading_ordering.clone()],
+                        &vec![order.clone()],
+                    ));
+                }
+            }
+        }
+
         new_properties
     }
 
@@ -318,6 +340,40 @@ impl ExecutionPlan for ProjectionExec {
     }
 }
 
+/// This function takes the current `output_ordering`, the `orderings` based 
on projected expressions,
+/// and the `expr` representing the projected expressions themselves. It aims 
to ensure that the output
+/// ordering is valid and correctly corresponds to the projected columns.
+///
+/// If the leading expression in the `output_ordering` is an 
[`UnKnownColumn`], it indicates that the column
+/// referenced in the ordering is not found among the projected expressions. 
In such cases, this function
+/// attempts to create a new output ordering by referring to valid columns 
from the leftmost side of the
+/// expressions that have an ordering specified.
+fn validate_output_ordering(
+    output_ordering: Option<Vec<PhysicalSortExpr>>,
+    orderings: &[Option<PhysicalSortExpr>],
+    expr: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Option<Vec<PhysicalSortExpr>> {
+    output_ordering.and_then(|ordering| {
+        // If the leading expression is invalid column, change output
+        // ordering of the projection so that it refers to valid columns if
+        // possible.
+        if ordering[0].expr.as_any().is::<UnKnownColumn>() {
+            for (idx, order) in orderings.iter().enumerate() {
+                if let Some(sort_expr) = order {
+                    let (_, col_name) = &expr[idx];
+                    return Some(vec![PhysicalSortExpr {
+                        expr: Arc::new(Column::new(col_name, idx)),
+                        options: sort_expr.options,
+                    }]);
+                }
+            }
+            None
+        } else {
+            Some(ordering)
+        }
+    })
+}
+
 /// If e is a direct column reference, returns the field level
 /// metadata for that field, if any. Otherwise returns None
 fn get_field_metadata(
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index a2477b0546..b8ca1acc1c 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::expressions::{CastExpr, Column};
+use crate::expressions::Column;
+use crate::utils::collect_columns;
 use crate::{
     normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
     PhysicalSortExpr,
@@ -24,7 +25,6 @@ use crate::{
 use arrow::datatypes::SchemaRef;
 use arrow_schema::Fields;
 
-use crate::utils::collect_columns;
 use std::collections::{HashMap, HashSet};
 use std::hash::Hash;
 use std::sync::Arc;
@@ -133,6 +133,24 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
 /// and treat `a ASC` and `b DESC` as the same ordering requirement.
 pub type OrderingEquivalenceProperties = EquivalenceProperties<LexOrdering>;
 
+impl OrderingEquivalenceProperties {
+    /// Checks whether `leading_ordering` is contained in any of the ordering
+    /// equivalence classes.
+    pub fn satisfies_leading_ordering(
+        &self,
+        leading_ordering: &PhysicalSortExpr,
+    ) -> bool {
+        for cls in &self.classes {
+            for ordering in 
cls.others.iter().chain(std::iter::once(&cls.head)) {
+                if ordering[0].eq(leading_ordering) {
+                    return true;
+                }
+            }
+        }
+        false
+    }
+}
+
 /// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are 
known
 /// to have the same value in all tuples in a relation. 
`EquivalentClass<Column>`
 /// is generated by equality predicates, typically equijoin conditions and 
equality
@@ -414,7 +432,13 @@ pub fn project_equivalence_properties(
             class.remove(&column);
         }
     }
-    eq_classes.retain(|props| props.len() > 1);
+
+    eq_classes.retain(|props| {
+        props.len() > 1
+            &&
+            // A column should not give an equivalence with itself.
+             !(props.len() == 2 && 
props.head.eq(props.others().iter().next().unwrap()))
+    });
 
     output_eq.extend(eq_classes);
 }
@@ -446,7 +470,7 @@ pub fn project_ordering_equivalence_properties(
         class.update_with_aliases(&oeq_alias_map, fields);
     }
 
-    // Prune columns that no longer is in the schema from from the 
OrderingEquivalenceProperties.
+    // Prune columns that are no longer in the schema from the 
OrderingEquivalenceProperties.
     for class in eq_classes.iter_mut() {
         let sort_exprs_to_remove = class
             .iter()
@@ -471,42 +495,6 @@ pub fn project_ordering_equivalence_properties(
     output_eq.extend(eq_classes);
 }
 
-/// Update `ordering` if it contains cast expression with target column
-/// after projection, if there is no cast expression among `ordering` 
expressions,
-/// returns `None`.
-fn update_with_cast_exprs(
-    cast_exprs: &[(CastExpr, Column)],
-    mut ordering: LexOrdering,
-) -> Option<LexOrdering> {
-    let mut is_changed = false;
-    for sort_expr in ordering.iter_mut() {
-        for (cast_expr, target_col) in cast_exprs.iter() {
-            if sort_expr.expr.eq(cast_expr.expr()) {
-                sort_expr.expr = Arc::new(target_col.clone()) as _;
-                is_changed = true;
-            }
-        }
-    }
-    is_changed.then_some(ordering)
-}
-
-/// Update cast expressions inside ordering equivalence
-/// properties with its target column after projection
-pub fn update_ordering_equivalence_with_cast(
-    cast_exprs: &[(CastExpr, Column)],
-    input_oeq: &mut OrderingEquivalenceProperties,
-) {
-    for cls in input_oeq.classes.iter_mut() {
-        for ordering in
-            
std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter())
-        {
-            if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs, 
ordering) {
-                cls.insert(updated_ordering);
-            }
-        }
-    }
-}
-
 /// Retrieves the ordering equivalence properties for a given schema and 
output ordering.
 pub fn ordering_equivalence_properties_helper(
     schema: SchemaRef,
diff --git a/datafusion/physical-expr/src/expressions/binary.rs 
b/datafusion/physical-expr/src/expressions/binary.rs
index ece5089ba3..2729d5e56d 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -22,6 +22,16 @@ mod kernels_arrow;
 use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
+use crate::array_expressions::{
+    array_append, array_concat, array_has_all, array_prepend,
+};
+use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
+use crate::intervals::{apply_operator, Interval};
+use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
+use crate::PhysicalExpr;
+
+use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
 use arrow::array::*;
 use arrow::compute::cast;
 use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
@@ -43,12 +53,14 @@ use arrow::compute::kernels::comparison::{
     eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, 
lt_dyn_utf8_scalar,
     lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
 };
+use arrow::compute::kernels::concat_elements::concat_elements_utf8;
 use arrow::datatypes::*;
 use arrow::record_batch::RecordBatch;
-
-use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
-use arrow::compute::kernels::concat_elements::concat_elements_utf8;
-
+use arrow_array::{Datum, Scalar};
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::type_coercion::binary::get_result_type;
+use datafusion_expr::{ColumnarValue, Operator};
 use kernels::{
     bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, 
bitwise_or_dyn_scalar,
     bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, 
bitwise_shift_right_dyn,
@@ -63,22 +75,6 @@ use kernels_arrow::{
     is_not_distinct_from_utf8,
 };
 
-use crate::array_expressions::{
-    array_append, array_concat, array_has_all, array_prepend,
-};
-use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
-use crate::intervals::{apply_operator, Interval};
-use crate::physical_expr::down_cast_any_ref;
-use crate::PhysicalExpr;
-use arrow_array::{Datum, Scalar};
-
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::internal_err;
-use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::type_coercion::binary::get_result_type;
-use datafusion_expr::{ColumnarValue, Operator};
-
 /// Binary expression
 #[derive(Debug, Hash, Clone)]
 pub struct BinaryExpr {
@@ -694,6 +690,20 @@ impl PhysicalExpr for BinaryExpr {
         let mut s = state;
         self.hash(&mut s);
     }
+
+    /// For each operator, [`BinaryExpr`] has distinct ordering rules.
+    /// TODO: There may be rules specific to some data types (such as division 
and multiplication on unsigned integers)
+    fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
+        let (left_child, right_child) = (&children[0], &children[1]);
+        match self.op() {
+            Operator::Plus => left_child.add(right_child),
+            Operator::Minus => left_child.sub(right_child),
+            Operator::Gt | Operator::GtEq => 
left_child.gt_or_gteq(right_child),
+            Operator::Lt | Operator::LtEq => 
right_child.gt_or_gteq(left_child),
+            Operator::And => left_child.and(right_child),
+            _ => SortProperties::Unordered,
+        }
+    }
 }
 
 impl PartialEq<dyn Any> for BinaryExpr {
diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index b1046f88e9..cd70c6e274 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -22,7 +22,9 @@ use std::sync::Arc;
 
 use crate::intervals::Interval;
 use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
 use crate::PhysicalExpr;
+
 use arrow::compute;
 use arrow::compute::{kernels, CastOptions};
 use arrow::datatypes::{DataType, Schema};
@@ -138,6 +140,11 @@ impl PhysicalExpr for CastExpr {
         // Add `self.cast_options` when hash is available
         // https://github.com/apache/arrow-rs/pull/4395
     }
+
+    /// A [`CastExpr`] preserves the ordering of its child.
+    fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
+        children[0]
+    }
 }
 
 impl PartialEq<dyn Any> for CastExpr {
diff --git a/datafusion/physical-expr/src/expressions/literal.rs 
b/datafusion/physical-expr/src/expressions/literal.rs
index 8e86716123..517a0fe411 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -22,6 +22,7 @@ use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
 use crate::PhysicalExpr;
 
 use arrow::{
@@ -88,6 +89,10 @@ impl PhysicalExpr for Literal {
         let mut s = state;
         self.hash(&mut s);
     }
+
+    fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
+        SortProperties::Singleton
+    }
 }
 
 impl PartialEq<dyn Any> for Literal {
diff --git a/datafusion/physical-expr/src/expressions/negative.rs 
b/datafusion/physical-expr/src/expressions/negative.rs
index 897f3b0d52..90430cb2bb 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -21,14 +21,16 @@ use std::any::Any;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
+use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
+use crate::PhysicalExpr;
+
 use arrow::{
     compute::kernels::numeric::neg_wrapping,
     datatypes::{DataType, Schema},
     record_batch::RecordBatch,
 };
 
-use crate::physical_expr::down_cast_any_ref;
-use crate::PhysicalExpr;
 use datafusion_common::{internal_err, DataFusionError, Result};
 use datafusion_expr::{
     type_coercion::{is_interval, is_null, is_signed_numeric},
@@ -102,6 +104,11 @@ impl PhysicalExpr for NegativeExpr {
         let mut s = state;
         self.hash(&mut s);
     }
+
+    /// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
+    fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
+        -children[0]
+    }
 }
 
 impl PartialEq<dyn Any> for NegativeExpr {
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index c61077f1a0..a8e49bfbd6 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -38,6 +38,7 @@ pub mod planner;
 pub mod regex_expressions;
 mod scalar_function;
 mod sort_expr;
+pub mod sort_properties;
 pub mod string_expressions;
 pub mod struct_expressions;
 pub mod tree_node;
@@ -53,12 +54,12 @@ pub use aggregate::groups_accumulator::{
 };
 pub use aggregate::AggregateExpr;
 pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
-
 pub use equivalence::{
     ordering_equivalence_properties_helper, project_equivalence_properties,
     project_ordering_equivalence_properties, EquivalenceProperties, 
EquivalentClass,
     OrderingEquivalenceProperties, OrderingEquivalentClass,
 };
+
 pub use partitioning::{Distribution, Partitioning};
 pub use physical_expr::{PhysicalExpr, PhysicalExprRef};
 pub use planner::create_physical_expr;
@@ -67,8 +68,9 @@ pub use sort_expr::{
     LexOrdering, LexOrderingRef, LexOrderingReq, PhysicalSortExpr,
     PhysicalSortRequirement,
 };
+pub use sort_properties::update_ordering;
 pub use utils::{
-    expr_list_eq_any_order, expr_list_eq_strict_order,
+    expr_list_eq_any_order, expr_list_eq_strict_order, find_orderings_of_exprs,
     normalize_expr_with_equivalence_properties, 
normalize_ordering_equivalence_classes,
     normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction,
 };
diff --git a/datafusion/physical-expr/src/physical_expr.rs 
b/datafusion/physical-expr/src/physical_expr.rs
index 29d7a6320a..ce3b7b6cf4 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::intervals::Interval;
+use crate::sort_properties::SortProperties;
 use crate::utils::scatter;
 
 use arrow::array::BooleanArray;
@@ -122,6 +123,19 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + 
PartialEq<dyn Any> {
     /// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
     /// directly because it must remain object safe.
     fn dyn_hash(&self, _state: &mut dyn Hasher);
+
+    /// The order information of a PhysicalExpr can be estimated from its 
children.
+    /// This is especially helpful for projection expressions. If we can 
ensure that the
+    /// order of a PhysicalExpr to project matches with the order of SortExec, 
we can
+    /// eliminate that SortExecs.
+    ///
+    /// By recursively calling this function, we can obtain the overall order
+    /// information of the PhysicalExpr. Since `SortOptions` cannot fully 
handle
+    /// the propagation of unordered columns and literals, the `SortProperties`
+    /// struct is used.
+    fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
+        SortProperties::Unordered
+    }
 }
 
 impl Hash for dyn PhysicalExpr {
diff --git a/datafusion/physical-expr/src/sort_properties.rs 
b/datafusion/physical-expr/src/sort_properties.rs
new file mode 100644
index 0000000000..001b86e60a
--- /dev/null
+++ b/datafusion/physical-expr/src/sort_properties.rs
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ops::Neg, sync::Arc};
+
+use crate::expressions::Column;
+use crate::utils::get_indices_of_matching_sort_exprs_with_order_eq;
+use crate::{
+    EquivalenceProperties, OrderingEquivalenceProperties, PhysicalExpr, 
PhysicalSortExpr,
+};
+
+use arrow_schema::SortOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
+use datafusion_common::Result;
+
+use itertools::Itertools;
+
+/// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is 
insufficient
+/// to simply use `Option<SortOptions>`: There must be a differentiation 
between
+/// unordered columns and literal values, since literals may not break the 
ordering
+/// when they are used as a child of some binary expression when the other 
child has
+/// some ordering. On the other hand, unordered columns cannot maintain 
ordering when
+/// they take part in such operations.
+///
+/// Example: ((a_ordered + b_unordered) + c_ordered) expression cannot end up 
with
+/// sorted data; however the ((a_ordered + 999) + c_ordered) expression can. 
Therefore,
+/// we need two different variants for literals and unordered columns as 
literals are
+/// often more ordering-friendly under most mathematical operations.
+#[derive(PartialEq, Debug, Clone, Copy)]
+pub enum SortProperties {
+    /// Use the ordinary [`SortOptions`] struct to represent ordered data:
+    Ordered(SortOptions),
+    // This alternative represents unordered data:
+    Unordered,
+    // Singleton is used for single-valued literal numbers:
+    Singleton,
+}
+
+impl SortProperties {
+    pub fn add(&self, rhs: &Self) -> Self {
+        match (self, rhs) {
+            (Self::Singleton, _) => *rhs,
+            (_, Self::Singleton) => *self,
+            (Self::Ordered(lhs), Self::Ordered(rhs))
+                if lhs.descending == rhs.descending =>
+            {
+                Self::Ordered(SortOptions {
+                    descending: lhs.descending,
+                    nulls_first: lhs.nulls_first || rhs.nulls_first,
+                })
+            }
+            _ => Self::Unordered,
+        }
+    }
+
+    pub fn sub(&self, rhs: &Self) -> Self {
+        match (self, rhs) {
+            (Self::Singleton, Self::Singleton) => Self::Singleton,
+            (Self::Singleton, Self::Ordered(rhs)) => Self::Ordered(SortOptions 
{
+                descending: !rhs.descending,
+                nulls_first: rhs.nulls_first,
+            }),
+            (_, Self::Singleton) => *self,
+            (Self::Ordered(lhs), Self::Ordered(rhs))
+                if lhs.descending != rhs.descending =>
+            {
+                Self::Ordered(SortOptions {
+                    descending: lhs.descending,
+                    nulls_first: lhs.nulls_first || rhs.nulls_first,
+                })
+            }
+            _ => Self::Unordered,
+        }
+    }
+
+    pub fn gt_or_gteq(&self, rhs: &Self) -> Self {
+        match (self, rhs) {
+            (Self::Singleton, Self::Ordered(rhs)) => Self::Ordered(SortOptions 
{
+                descending: !rhs.descending,
+                nulls_first: rhs.nulls_first,
+            }),
+            (_, Self::Singleton) => *self,
+            (Self::Ordered(lhs), Self::Ordered(rhs))
+                if lhs.descending != rhs.descending =>
+            {
+                *self
+            }
+            _ => Self::Unordered,
+        }
+    }
+
+    pub fn and(&self, rhs: &Self) -> Self {
+        match (self, rhs) {
+            (Self::Ordered(lhs), Self::Ordered(rhs))
+                if lhs.descending == rhs.descending =>
+            {
+                Self::Ordered(SortOptions {
+                    descending: lhs.descending,
+                    nulls_first: lhs.nulls_first || rhs.nulls_first,
+                })
+            }
+            (Self::Ordered(opt), Self::Singleton)
+            | (Self::Singleton, Self::Ordered(opt)) => 
Self::Ordered(SortOptions {
+                descending: opt.descending,
+                nulls_first: opt.nulls_first,
+            }),
+            (Self::Singleton, Self::Singleton) => Self::Singleton,
+            _ => Self::Unordered,
+        }
+    }
+}
+
+impl Neg for SortProperties {
+    type Output = Self;
+
+    fn neg(self) -> Self::Output {
+        match self {
+            SortProperties::Ordered(SortOptions {
+                descending,
+                nulls_first,
+            }) => SortProperties::Ordered(SortOptions {
+                descending: !descending,
+                nulls_first,
+            }),
+            SortProperties::Singleton => SortProperties::Singleton,
+            SortProperties::Unordered => SortProperties::Unordered,
+        }
+    }
+}
+
+/// The `ExprOrdering` struct is designed to aid in the determination of 
ordering (represented
+/// by [`SortProperties`]) for a given [`PhysicalExpr`]. When analyzing the 
orderings
+/// of a [`PhysicalExpr`], the process begins by assigning the ordering of its 
leaf nodes.
+/// By propagating these leaf node orderings upwards in the expression tree, 
the overall
+/// ordering of the entire [`PhysicalExpr`] can be derived.
+///
+/// This struct holds the necessary state information for each expression in 
the [`PhysicalExpr`].
+/// It encapsulates the orderings (`state`) associated with the expression 
(`expr`), and
+/// orderings of the children expressions (`children_states`). The 
[`ExprOrdering`] of a parent
+/// expression is determined based on the [`ExprOrdering`] states of its 
children expressions.
+#[derive(Debug)]
+pub struct ExprOrdering {
+    pub expr: Arc<dyn PhysicalExpr>,
+    pub state: Option<SortProperties>,
+    pub children_states: Option<Vec<SortProperties>>,
+}
+
+impl ExprOrdering {
+    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
+        Self {
+            expr,
+            state: None,
+            children_states: None,
+        }
+    }
+
+    pub fn children(&self) -> Vec<ExprOrdering> {
+        self.expr
+            .children()
+            .into_iter()
+            .map(|e| ExprOrdering::new(e))
+            .collect()
+    }
+
+    pub fn new_with_children(
+        children_states: Vec<SortProperties>,
+        parent_expr: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self {
+            expr: parent_expr,
+            state: None,
+            children_states: Some(children_states),
+        }
+    }
+}
+
+impl TreeNode for ExprOrdering {
+    fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
+    where
+        F: FnMut(&Self) -> Result<VisitRecursion>,
+    {
+        for child in self.children() {
+            match op(&child)? {
+                VisitRecursion::Continue => {}
+                VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
+                VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
+            }
+        }
+        Ok(VisitRecursion::Continue)
+    }
+
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            Ok(ExprOrdering::new_with_children(
+                children
+                    .into_iter()
+                    .map(transform)
+                    .map_ok(|c| c.state.unwrap_or(SortProperties::Unordered))
+                    .collect::<Result<Vec<_>>>()?,
+                self.expr,
+            ))
+        }
+    }
+}
+
+/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node.
+/// The node is either a leaf node, or an intermediate node:
+/// - If it is a leaf node, the children states are `None`. We directly find
+/// the order of the node by looking at the given sort expression and 
equivalence
+/// properties if it is a `Column` leaf, or we mark it as unordered. In the 
case
+/// of a `Literal` leaf, we mark it as singleton so that it can cooperate with
+/// some ordered columns at the upper steps.
+/// - If it is an intermediate node, the children states matter. Each 
`PhysicalExpr`
+/// and operator has its own rules about how to propagate the children 
orderings.
+/// However, before the children order propagation, it is checked that whether
+/// the intermediate node can be directly matched with the sort expression. If 
there
+/// is a match, the sort expression emerges at that node immediately, 
discarding
+/// the order coming from the children.
+pub fn update_ordering(
+    mut node: ExprOrdering,
+    sort_expr: &PhysicalSortExpr,
+    equal_properties: &EquivalenceProperties,
+    ordering_equal_properties: &OrderingEquivalenceProperties,
+) -> Result<Transformed<ExprOrdering>> {
+    // If we can directly match a sort expr with the current node, we can set
+    // its state and return early.
+    // TODO: If there is a PhysicalExpr other than a Column at this node (e.g.
+    //       a BinaryExpr like a + b), and there is an ordering equivalence of
+    //       it (let's say like c + d), we actually can find it at this step.
+    if sort_expr.expr.eq(&node.expr) {
+        node.state = Some(SortProperties::Ordered(sort_expr.options));
+        return Ok(Transformed::Yes(node));
+    }
+
+    if let Some(children_sort_options) = &node.children_states {
+        // We have an intermediate (non-leaf) node, account for its children:
+        node.state = Some(node.expr.get_ordering(children_sort_options));
+    } else if let Some(column) = node.expr.as_any().downcast_ref::<Column>() {
+        // We have a Column, which is one of the two possible leaf node types:
+        node.state = get_indices_of_matching_sort_exprs_with_order_eq(
+            &[sort_expr.clone()],
+            &[column.clone()],
+            equal_properties,
+            ordering_equal_properties,
+        )
+        .map(|(sort_options, _)| {
+            SortProperties::Ordered(SortOptions {
+                descending: sort_options[0].descending,
+                nulls_first: sort_options[0].nulls_first,
+            })
+        });
+    } else {
+        // We have a Literal, which is the other possible leaf node type:
+        node.state = Some(node.expr.get_ordering(&[]));
+    }
+    Ok(Transformed::Yes(node))
+}
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index 1492d6008e..6d3a68d40c 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,11 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::borrow::Borrow;
+use std::collections::{HashMap, HashSet};
+use std::ops::Range;
+use std::sync::Arc;
+
 use crate::equivalence::{
     EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
     OrderingEquivalentClass,
 };
 use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
+use crate::sort_properties::{ExprOrdering, SortProperties};
+use crate::update_ordering;
 use crate::{
     LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement,
 };
@@ -27,19 +34,16 @@ use crate::{
 use arrow::array::{make_array, Array, ArrayRef, BooleanArray, 
MutableArrayData};
 use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
 use arrow::datatypes::SchemaRef;
+use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
 };
+use datafusion_common::utils::longest_consecutive_prefix;
 use datafusion_common::Result;
 use datafusion_expr::Operator;
 
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
-use std::borrow::Borrow;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::ops::Range;
-use std::sync::Arc;
 
 /// Compare the two expr lists are equal no matter the order.
 /// For example two InListExpr can be considered to be equals no matter the 
order:
@@ -155,10 +159,9 @@ pub fn normalize_expr_with_equivalence_properties(
         .unwrap_or(expr)
 }
 
-/// This function returns the normalized version of `sort_expr` with respect to
-/// `eq_properties`, if possible. Otherwise, it returns its first argument as 
is.
-/// Note that this simply means returning the head [`PhysicalSortExpr`] in the
-/// given equivalence set.
+/// This function normalizes `sort_expr` according to `eq_properties`. If the
+/// given sort expression doesn't belong to equivalence set `eq_properties`,
+/// it returns `sort_expr` as is.
 fn normalize_sort_expr_with_equivalence_properties(
     mut sort_expr: PhysicalSortExpr,
     eq_properties: &[EquivalentClass],
@@ -168,11 +171,9 @@ fn normalize_sort_expr_with_equivalence_properties(
     sort_expr
 }
 
-/// This function returns the normalized version of every [`PhysicalSortExpr`]
-/// in `sort_exprs` w.r.t. `eq_properties`, if possible. The 
[`PhysicalSortExpr`]s
-/// for which this is impossible are returned as is. Basically, this function
-/// applies [`normalize_sort_expr_with_equivalence_properties`] to multiple
-/// [`PhysicalSortExpr`]s at once.
+/// This function applies the 
[`normalize_sort_expr_with_equivalence_properties`]
+/// function for all sort expressions in `sort_exprs` and returns a vector of
+/// normalized sort expressions.
 pub fn normalize_sort_exprs_with_equivalence_properties(
     sort_exprs: LexOrderingRef,
     eq_properties: &EquivalenceProperties,
@@ -188,8 +189,9 @@ pub fn normalize_sort_exprs_with_equivalence_properties(
         .collect()
 }
 
-/// This function returns the head [`PhysicalSortRequirement`] of equivalence 
set of a [`PhysicalSortRequirement`],
-/// if there is any, otherwise; returns the same [`PhysicalSortRequirement`].
+/// This function normalizes `sort_requirement` according to `eq_properties`.
+/// If the given sort requirement doesn't belong to equivalence set
+/// `eq_properties`, it returns `sort_requirement` as is.
 fn normalize_sort_requirement_with_equivalence_properties(
     mut sort_requirement: PhysicalSortRequirement,
     eq_properties: &[EquivalentClass],
@@ -258,10 +260,9 @@ pub fn normalize_sort_exprs(
     let normalized_exprs = 
PhysicalSortRequirement::to_sort_exprs(normalized_exprs);
     collapse_vec(normalized_exprs)
 }
-
-/// This function "normalizes" its argument `oeq_classes` by making sure that
-/// it only refers to representative (i.e. head) entries in the given 
equivlance
-/// properties (`eq_properties`).
+/// This function normalizes `oeq_classes` expressions according to 
`eq_properties`.
+/// More explicitly, it makes sure that expressions in `oeq_classes` are head 
entries
+/// in `eq_properties`, replacing any non-head entries with head entries if 
necessary.
 pub fn normalize_ordering_equivalence_classes(
     oeq_classes: &[OrderingEquivalentClass],
     eq_properties: &EquivalenceProperties,
@@ -565,7 +566,7 @@ pub fn get_indices_of_matching_exprs<
 
 /// This function finds the indices of `targets` within `items` using strict
 /// equality.
-fn get_indices_of_exprs_strict<T: Borrow<Arc<dyn PhysicalExpr>>>(
+pub fn get_indices_of_exprs_strict<T: Borrow<Arc<dyn PhysicalExpr>>>(
     targets: impl IntoIterator<Item = T>,
     items: &[Arc<dyn PhysicalExpr>],
 ) -> Vec<usize> {
@@ -827,21 +828,172 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) 
-> Result<ArrayRef> {
     Ok(make_array(data))
 }
 
+/// Return indices of each item in `required_exprs` inside `provided_exprs`.
+/// All the items should be found inside `provided_exprs`. Found indices will
+/// be a permutation of the range 0, 1, ..., N. For example, \[2,1,0\] is valid
+/// (\[0,1,2\] is consecutive), but \[3,1,0\] is not valid (\[0,1,3\] is not
+/// consecutive).
+fn get_lexicographical_match_indices(
+    required_exprs: &[Arc<dyn PhysicalExpr>],
+    provided_exprs: &[Arc<dyn PhysicalExpr>],
+) -> Option<Vec<usize>> {
+    let indices_of_equality = get_indices_of_exprs_strict(required_exprs, 
provided_exprs);
+    let mut ordered_indices = indices_of_equality.clone();
+    ordered_indices.sort();
+    let n_match = indices_of_equality.len();
+    let first_n = longest_consecutive_prefix(ordered_indices);
+    (n_match == required_exprs.len() && first_n == n_match && n_match > 0)
+        .then_some(indices_of_equality)
+}
+
+/// Attempts to find a full match between the required columns to be ordered 
(lexicographically), and
+/// the provided sort options (lexicographically), while considering 
equivalence properties.
+///
+/// It starts by normalizing members of both the required columns and the 
provided sort options.
+/// If a full match is found, returns the sort options and indices of the 
matches. If no full match is found,
+/// the function proceeds to check against ordering equivalence properties. If 
still no full match is found,
+/// the function returns `None`.
+pub fn get_indices_of_matching_sort_exprs_with_order_eq(
+    provided_sorts: &[PhysicalSortExpr],
+    required_columns: &[Column],
+    eq_properties: &EquivalenceProperties,
+    order_eq_properties: &OrderingEquivalenceProperties,
+) -> Option<(Vec<SortOptions>, Vec<usize>)> {
+    // Create a vector of `PhysicalSortRequirement`s from the required columns:
+    let sort_requirement_on_requirements = required_columns
+        .iter()
+        .map(|required_column| PhysicalSortRequirement {
+            expr: Arc::new(required_column.clone()) as _,
+            options: None,
+        })
+        .collect::<Vec<_>>();
+
+    let normalized_required = normalize_sort_requirements(
+        &sort_requirement_on_requirements,
+        eq_properties.classes(),
+        &[],
+    );
+    let normalized_provided = normalize_sort_requirements(
+        &PhysicalSortRequirement::from_sort_exprs(provided_sorts.iter()),
+        eq_properties.classes(),
+        &[],
+    );
+
+    let provided_sorts = normalized_provided
+        .iter()
+        .map(|req| req.expr.clone())
+        .collect::<Vec<_>>();
+
+    let normalized_required_expr = normalized_required
+        .iter()
+        .map(|req| req.expr.clone())
+        .collect::<Vec<_>>();
+
+    if let Some(indices_of_equality) =
+        get_lexicographical_match_indices(&normalized_required_expr, 
&provided_sorts)
+    {
+        return Some((
+            indices_of_equality
+                .iter()
+                .filter_map(|index| normalized_provided[*index].options)
+                .collect(),
+            indices_of_equality,
+        ));
+    }
+
+    // We did not find all the expressions, consult ordering equivalence 
properties:
+    for class in order_eq_properties.classes() {
+        let head = class.head();
+        for ordering in class.others().iter().chain(std::iter::once(head)) {
+            let order_eq_class_exprs = convert_to_expr(ordering);
+            if let Some(indices_of_equality) = 
get_lexicographical_match_indices(
+                &normalized_required_expr,
+                &order_eq_class_exprs,
+            ) {
+                return Some((
+                    indices_of_equality
+                        .iter()
+                        .map(|index| ordering[*index].options)
+                        .collect(),
+                    indices_of_equality,
+                ));
+            }
+        }
+    }
+    // If no match found, return `None`:
+    None
+}
+
+/// Calculates the output orderings for a set of expressions within the 
context of a given
+/// execution plan. The resulting orderings are all in the type of [`Column`], 
since these
+/// expressions become [`Column`] after the projection step. The expressions 
having an alias
+/// are renamed with those aliases in the returned [`PhysicalSortExpr`]'s. If 
an expression
+/// is found to be unordered, the corresponding entry in the output vector is 
`None`.
+///
+/// # Arguments
+///
+/// * `expr` - A slice of tuples containing expressions and their 
corresponding aliases.
+///
+/// * `input` - A reference to an execution plan that provides output ordering 
and equivalence
+/// properties.
+///
+/// # Returns
+///
+/// A `Result` containing a vector of optional [`PhysicalSortExpr`]'s. Each 
element of the
+/// vector corresponds to an expression from the input slice. If an expression 
can be ordered,
+/// the corresponding entry is `Some(PhysicalSortExpr)`. If an expression 
cannot be ordered,
+/// the entry is `None`.
+pub fn find_orderings_of_exprs(
+    expr: &[(Arc<dyn PhysicalExpr>, String)],
+    input_output_ordering: Option<&[PhysicalSortExpr]>,
+    input_equal_properties: EquivalenceProperties,
+    input_ordering_equal_properties: OrderingEquivalenceProperties,
+) -> Result<Vec<Option<PhysicalSortExpr>>> {
+    let mut orderings: Vec<Option<PhysicalSortExpr>> = vec![];
+    if let Some(leading_ordering) =
+        input_output_ordering.map(|output_ordering| &output_ordering[0])
+    {
+        for (index, (expression, name)) in expr.iter().enumerate() {
+            let initial_expr = ExprOrdering::new(expression.clone());
+            let transformed = initial_expr.transform_up(&|expr| {
+                update_ordering(
+                    expr,
+                    leading_ordering,
+                    &input_equal_properties,
+                    &input_ordering_equal_properties,
+                )
+            })?;
+            if let Some(SortProperties::Ordered(sort_options)) = 
transformed.state {
+                orderings.push(Some(PhysicalSortExpr {
+                    expr: Arc::new(Column::new(name, index)),
+                    options: sort_options,
+                }));
+            } else {
+                orderings.push(None);
+            }
+        }
+    }
+    Ok(orderings)
+}
+
 #[cfg(test)]
 mod tests {
+    use std::fmt::{Display, Formatter};
+    use std::ops::Not;
+    use std::sync::Arc;
+
     use super::*;
+    use crate::equivalence::OrderingEquivalenceProperties;
     use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
     use crate::PhysicalSortExpr;
+
     use arrow::compute::SortOptions;
     use arrow_array::Int32Array;
+    use arrow_schema::{DataType, Field, Schema};
     use datafusion_common::cast::{as_boolean_array, as_int32_array};
     use datafusion_common::{Result, ScalarValue};
-    use std::fmt::{Display, Formatter};
 
-    use crate::equivalence::OrderingEquivalenceProperties;
-    use arrow_schema::{DataType, Field, Schema};
     use petgraph::visit::Bfs;
-    use std::sync::Arc;
 
     #[derive(Clone)]
     struct DummyProperty {
@@ -1662,4 +1814,162 @@ mod tests {
         assert_eq!(&expected, result);
         Ok(())
     }
+
+    #[test]
+    fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
+        let sort_options = SortOptions::default();
+        let sort_options_not = SortOptions::default().not();
+
+        let provided_sorts = [
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: sort_options_not,
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: sort_options,
+            },
+        ];
+        let required_columns = [Column::new("b", 1), Column::new("a", 0)];
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+        ]);
+        let equal_properties = 
EquivalenceProperties::new(Arc::new(schema.clone()));
+        let ordering_equal_properties =
+            OrderingEquivalenceProperties::new(Arc::new(schema));
+        assert_eq!(
+            get_indices_of_matching_sort_exprs_with_order_eq(
+                &provided_sorts,
+                &required_columns,
+                &equal_properties,
+                &ordering_equal_properties,
+            ),
+            Some((vec![sort_options_not, sort_options], vec![0, 1]))
+        );
+
+        // required columns are provided in the equivalence classes
+        let provided_sorts = [PhysicalSortExpr {
+            expr: Arc::new(Column::new("c", 2)),
+            options: sort_options,
+        }];
+        let required_columns = [Column::new("b", 1), Column::new("a", 0)];
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+        ]);
+        let equal_properties = 
EquivalenceProperties::new(Arc::new(schema.clone()));
+        let mut ordering_equal_properties =
+            OrderingEquivalenceProperties::new(Arc::new(schema));
+        ordering_equal_properties.add_equal_conditions((
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: sort_options,
+            }],
+            &vec![
+                PhysicalSortExpr {
+                    expr: Arc::new(Column::new("b", 1)),
+                    options: sort_options_not,
+                },
+                PhysicalSortExpr {
+                    expr: Arc::new(Column::new("a", 0)),
+                    options: sort_options,
+                },
+            ],
+        ));
+        assert_eq!(
+            get_indices_of_matching_sort_exprs_with_order_eq(
+                &provided_sorts,
+                &required_columns,
+                &equal_properties,
+                &ordering_equal_properties,
+            ),
+            Some((vec![sort_options_not, sort_options], vec![0, 1]))
+        );
+
+        // not satisfied orders
+        let provided_sorts = [
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: sort_options_not,
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: sort_options,
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: sort_options,
+            },
+        ];
+        let required_columns = [Column::new("b", 1), Column::new("a", 0)];
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+        ]);
+        let equal_properties = 
EquivalenceProperties::new(Arc::new(schema.clone()));
+        let ordering_equal_properties =
+            OrderingEquivalenceProperties::new(Arc::new(schema));
+        assert_eq!(
+            get_indices_of_matching_sort_exprs_with_order_eq(
+                &provided_sorts,
+                &required_columns,
+                &equal_properties,
+                &ordering_equal_properties,
+            ),
+            None
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_normalize_ordering_equivalence_classes() -> Result<()> {
+        let sort_options = SortOptions::default();
+
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+        ]);
+        let mut equal_properties = 
EquivalenceProperties::new(Arc::new(schema.clone()));
+        let mut ordering_equal_properties =
+            OrderingEquivalenceProperties::new(Arc::new(schema.clone()));
+        let mut expected_oeq = 
OrderingEquivalenceProperties::new(Arc::new(schema));
+
+        equal_properties
+            .add_equal_conditions((&Column::new("a", 0), &Column::new("c", 
2)));
+        ordering_equal_properties.add_equal_conditions((
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: sort_options,
+            }],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: sort_options,
+            }],
+        ));
+        expected_oeq.add_equal_conditions((
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: sort_options,
+            }],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: sort_options,
+            }],
+        ));
+
+        assert!(!normalize_ordering_equivalence_classes(
+            ordering_equal_properties.classes(),
+            &equal_properties,
+        )
+        .iter()
+        .zip(expected_oeq.classes())
+        .any(|(a, b)| a.head().ne(b.head()) || a.others().ne(b.others())));
+
+        Ok(())
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index 47c7acde32..dffc06eaf4 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -410,3 +410,38 @@ SELECT DISTINCT time as "first_seen" FROM t ORDER BY 1;
 ## Cleanup
 statement ok
 drop table t;
+
+# Create a table having 3 columns which are ordering equivalent by the source. 
In the next step,
+# we will expect to observe the removed SortExec by propagating the orders 
across projection.
+statement ok
+CREATE EXTERNAL TABLE multiple_ordered_table (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC)
+WITH ORDER (b ASC)
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv';
+
+query TT
+EXPLAIN SELECT (b+a+c) AS result 
+FROM multiple_ordered_table
+ORDER BY result;
+----
+logical_plan
+Sort: result ASC NULLS LAST
+--Projection: multiple_ordered_table.b + multiple_ordered_table.a + 
multiple_ordered_table.c AS result
+----TableScan: multiple_ordered_table projection=[a, b, c]
+physical_plan
+SortPreservingMergeExec: [result@0 ASC NULLS LAST]
+--ProjectionExec: expr=[b@1 + a@0 + c@2 as result]
+----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table multiple_ordered_table;
\ No newline at end of file

Reply via email to