This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fc91cc8fb Top down `EnforceSorting`, Extended testbench for 
`EnforceSorting` rule to prepare for refactors, additional functionality such 
as pushdowns over unions (#5661)
5fc91cc8fb is described below

commit 5fc91cc8fbb56b6d2a32e66f8b327a871f7d15ac
Author: Mustafa Akur <106137913+mustafasr...@users.noreply.github.com>
AuthorDate: Tue Apr 4 17:14:54 2023 +0300

    Top down `EnforceSorting`, Extended testbench for `EnforceSorting` rule to 
prepare for refactors, additional functionality such as pushdowns over unions 
(#5661)
    
    * Top Down Sort Enforer
    
    * Add support to optimize parallel sorting
    
    * fix UT
    
    * add more UTs to sort_enforcement2.rs
    
    * refine codebase
    
    * Fix SortMergeJoin case
    
    * Fix reverse window sort requirements
    
    * fix test comments
    
    * add determine_children_requirement() method
    
    * Convert Topdown to pushdown then unify
    
    t
    
    * reorganize to decrease diff, remove ignore
    
    * Update test
    
    * retract dist enforcement
    
    * tmp
    
    * reorganize files
    
    * reorganize tests
    
    * simplify sort pushdown
    
    * remove global sort print
    
    * remove unnecessary parameter
    
    * Updates
    
    * simplifications
    
    * Refactors and simplifications part 1
    
    * Refactors and simplifications part 2
    
    * simplifications
    
    * remove_sort_keys parameters from window
    
    * Update window multi_path test
    
    * consider existing ordering during Coalesce
    
    * retract assertion in planner
    
    * remove todo.
    
    * remove unnecessary repartition from plan
    
    * update comments
    
    * Remove commented out code
    
    * Address reviews
    
    * update comments
    
    * address reviews
    
    ---------
    
    Co-authored-by: mingmw...@ebay.com <mingmw...@ebay.com>
    Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com>
---
 .../src/physical_optimizer/dist_enforcement.rs     |   54 +-
 .../physical_optimizer/global_sort_selection.rs    |    6 +-
 datafusion/core/src/physical_optimizer/mod.rs      |    1 +
 .../core/src/physical_optimizer/repartition.rs     |   45 +-
 .../src/physical_optimizer/sort_enforcement.rs     | 1004 +++++++++++++++-----
 .../core/src/physical_optimizer/sort_pushdown.rs   |  416 ++++++++
 datafusion/core/src/physical_optimizer/utils.rs    |   43 +
 .../src/physical_plan/joins/sort_merge_join.rs     |   17 +-
 datafusion/core/src/physical_plan/planner.rs       |    1 -
 .../src/physical_plan/windows/window_agg_exec.rs   |    3 -
 datafusion/physical-expr/src/utils.rs              |  134 ++-
 datafusion/sql/src/statement.rs                    |    8 +-
 12 files changed, 1424 insertions(+), 308 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs 
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index ef1b6a576d..d3e99945e9 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -38,11 +38,11 @@ use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::equivalence::EquivalenceProperties;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::expressions::NoOp;
+use datafusion_physical_expr::utils::map_columns_before_projection;
 use datafusion_physical_expr::{
     expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, 
AggregateExpr,
     PhysicalExpr,
 };
-use std::collections::HashMap;
 use std::sync::Arc;
 
 /// The EnforceDistribution rule ensures that distribution requirements are met
@@ -487,30 +487,6 @@ fn reorder_aggregate_keys(
     }
 }
 
-fn map_columns_before_projection(
-    parent_required: &[Arc<dyn PhysicalExpr>],
-    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
-) -> Vec<Arc<dyn PhysicalExpr>> {
-    let mut column_mapping = HashMap::new();
-    for (expression, name) in proj_exprs.iter() {
-        if let Some(column) = expression.as_any().downcast_ref::<Column>() {
-            column_mapping.insert(name.clone(), column.clone());
-        };
-    }
-    let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
-        .iter()
-        .filter_map(|r| {
-            if let Some(column) = r.as_any().downcast_ref::<Column>() {
-                column_mapping.get(column.name())
-            } else {
-                None
-            }
-        })
-        .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
-        .collect::<Vec<_>>();
-    new_required
-}
-
 fn shift_right_required(
     parent_required: &[Arc<dyn PhysicalExpr>],
     left_columns_len: usize,
@@ -1026,6 +1002,30 @@ mod tests {
         ))
     }
 
+    // Created a sorted parquet exec with multiple files
+    fn parquet_exec_multiple_sorted(
+        output_ordering: Option<Vec<PhysicalSortExpr>>,
+    ) -> Arc<ParquetExec> {
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema(),
+                file_groups: vec![
+                    vec![PartitionedFile::new("x".to_string(), 100)],
+                    vec![PartitionedFile::new("y".to_string(), 100)],
+                ],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering,
+                infinite_source: false,
+            },
+            None,
+            None,
+        ))
+    }
+
     fn projection_exec_with_alias(
         input: Arc<dyn ExecutionPlan>,
         alias_pairs: Vec<(String, String)>,
@@ -2108,7 +2108,7 @@ mod tests {
         }];
 
         // Scan some sorted parquet files
-        let exec = parquet_exec_with_sort(Some(sort_key.clone()));
+        let exec = parquet_exec_multiple_sorted(Some(sort_key.clone()));
 
         // CoalesceBatchesExec to mimic behavior after a filter
         let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
@@ -2121,7 +2121,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [a@0 ASC]",
             "CoalesceBatchesExec: target_batch_size=4096",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
+            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, 
output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, exec);
         Ok(())
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs 
b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
index 9342298633..e29735e741 100644
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -48,7 +48,7 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         plan.transform_up(&|plan| {
             let transformed =
@@ -56,10 +56,10 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
                     .downcast_ref::<SortExec>()
                     .and_then(|sort_exec| {
                         if 
sort_exec.input().output_partitioning().partition_count() > 1
-                        && sort_exec.fetch().is_some()
                         // It's already preserving the partitioning so that it 
can be regarded as a local sort
                         && !sort_exec.preserve_partitioning()
-                        {
+                        && (sort_exec.fetch().is_some() ||  
config.optimizer.repartition_sorts)
+                    {
                             let sort = SortExec::new_with_partitioning(
                                 sort_exec.expr().to_vec(),
                                 sort_exec.input().clone(),
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index 3958a546a9..5111e55292 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -28,6 +28,7 @@ pub mod pipeline_checker;
 pub mod pruning;
 pub mod repartition;
 pub mod sort_enforcement;
+mod sort_pushdown;
 mod utils;
 
 pub mod pipeline_fixer;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 71274d177d..6c2d5c9348 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -321,9 +321,6 @@ fn init() {
 mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use datafusion_physical_expr::{
-        make_sort_requirements_from_exprs, PhysicalSortRequirement,
-    };
 
     use super::*;
     use crate::datasource::listing::PartitionedFile;
@@ -342,6 +339,9 @@ mod tests {
     use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::union::UnionExec;
     use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
+    use datafusion_physical_expr::{
+        make_sort_requirements_from_exprs, PhysicalSortRequirement,
+    };
 
     fn schema() -> SchemaRef {
         Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
@@ -412,6 +412,33 @@ mod tests {
         ))
     }
 
+    // Created a sorted parquet exec with multiple files
+    fn parquet_exec_multiple_sorted() -> Arc<ParquetExec> {
+        let sort_exprs = vec![PhysicalSortExpr {
+            expr: col("c1", &schema()).unwrap(),
+            options: SortOptions::default(),
+        }];
+
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema(),
+                file_groups: vec![
+                    vec![PartitionedFile::new("x".to_string(), 100)],
+                    vec![PartitionedFile::new("y".to_string(), 100)],
+                ],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: Some(sort_exprs),
+                infinite_source: false,
+            },
+            None,
+            None,
+        ))
+    }
+
     fn sort_preserving_merge_exec(
         input: Arc<dyn ExecutionPlan>,
     ) -> Arc<dyn ExecutionPlan> {
@@ -737,12 +764,12 @@ mod tests {
     #[test]
     fn repartition_ignores_sort_preserving_merge() -> Result<()> {
         // sort preserving merge already sorted input,
-        let plan = sort_preserving_merge_exec(parquet_exec_sorted());
+        let plan = sort_preserving_merge_exec(parquet_exec_multiple_sorted());
 
         // should not repartition / sort (as the data was already sorted)
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, 
output_ordering=[c1@0 ASC], projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -838,13 +865,14 @@ mod tests {
     #[test]
     fn repartition_ignores_transitively_with_projection() -> Result<()> {
         // sorted input
-        let plan = 
sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));
+        let plan =
+            
sort_preserving_merge_exec(projection_exec(parquet_exec_multiple_sorted()));
 
         // data should not be repartitioned / resorted
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, 
output_ordering=[c1@0 ASC], projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -857,7 +885,6 @@ mod tests {
             
sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true));
 
         let expected = &[
-            "SortPreservingMergeExec: [c1@0 ASC]",
             "SortExec: expr=[c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
@@ -1040,7 +1067,6 @@ mod tests {
 
         // parallelization potentially could break sort order
         let expected = &[
-            "SortPreservingMergeExec: [c1@0 ASC]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[c1@0 ASC], projection=[c1]",
         ];
 
@@ -1090,7 +1116,6 @@ mod tests {
 
         // data should not be repartitioned / resorted
         let expected = &[
-            "SortPreservingMergeExec: [c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[c1@0 ASC], projection=[c1]",
         ];
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 265c86cdf1..7428c339dc 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -35,13 +35,15 @@
 //! by another SortExec. Therefore, this rule removes it from the physical 
plan.
 use crate::config::ConfigOptions;
 use crate::error::Result;
-use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
+use crate::physical_optimizer::utils::{
+    add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort,
+    is_sort_preserving_merge, is_union, is_window,
+};
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::sorts::sort::SortExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
-use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
 use arrow::datatypes::SchemaRef;
@@ -104,8 +106,8 @@ impl ExecTree {
     }
 }
 
-/// This object is used within the [EnforceSorting] rule to track the closest
-/// `SortExec` descendant(s) for every child of a plan.
+/// This object is used within the [`EnforceSorting`] rule to track the closest
+/// [`SortExec`] descendant(s) for every child of a plan.
 #[derive(Debug, Clone)]
 struct PlanWithCorrespondingSort {
     plan: Arc<dyn ExecutionPlan>,
@@ -149,19 +151,11 @@ impl PlanWithCorrespondingSort {
                     return None;
                 }
                 let is_spm = is_sort_preserving_merge(plan);
-                let is_union = plan.as_any().is::<UnionExec>();
-                // If the executor is a `UnionExec`, and it has an output 
ordering;
-                // then it at least partially maintains some child's output 
ordering.
-                // Therefore, we propagate this information upwards.
-                let partially_maintains = is_union && 
plan.output_ordering().is_some();
                 let required_orderings = plan.required_input_ordering();
                 let flags = plan.maintains_input_order();
                 let children = izip!(flags, item.sort_onwards, 
required_orderings)
                     .filter_map(|(maintains, element, required_ordering)| {
-                        if (required_ordering.is_none()
-                            && (maintains || partially_maintains))
-                            || is_spm
-                        {
+                        if (required_ordering.is_none() && maintains) || 
is_spm {
                             element
                         } else {
                             None
@@ -227,7 +221,7 @@ impl TreeNode for PlanWithCorrespondingSort {
 }
 
 /// This object is used within the [EnforceSorting] rule to track the closest
-/// `CoalescePartitionsExec` descendant(s) for every child of a plan.
+/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan.
 #[derive(Debug, Clone)]
 struct PlanWithCorrespondingCoalescePartitions {
     plan: Arc<dyn ExecutionPlan>,
@@ -235,7 +229,7 @@ struct PlanWithCorrespondingCoalescePartitions {
     // child until the `CoalescePartitionsExec`(s) -- could be multiple for
     // n-ary plans like Union -- that affect the output partitioning of the
     // child. If the child has no connection to any `CoalescePartitionsExec`,
-    // simplify store None (and not a subtree).
+    // simply store None (and not a subtree).
     coalesce_onwards: Vec<Option<ExecTree>>,
 }
 
@@ -265,11 +259,11 @@ impl PlanWithCorrespondingCoalescePartitions {
                 // maintain a single partition. If we just saw a 
`CoalescePartitionsExec`
                 // operator, we reset the tree and start accumulating.
                 let plan = item.plan;
-                if plan.as_any().is::<CoalescePartitionsExec>() {
-                    Some(ExecTree::new(plan, idx, vec![]))
-                } else if plan.children().is_empty() {
+                if plan.children().is_empty() {
                     // Plan has no children, there is nothing to propagate.
                     None
+                } else if is_coalesce_partitions(&plan) {
+                    Some(ExecTree::new(plan, idx, vec![]))
                 } else {
                     let children = item
                         .coalesce_onwards
@@ -356,16 +350,23 @@ impl PhysicalOptimizerRule for EnforceSorting {
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let plan_requirements = PlanWithCorrespondingSort::new(plan);
+        // Execute a bottom-up traversal to enforce sorting requirements,
+        // remove unnecessary sorts, and optimize sort-sensitive operators:
         let adjusted = plan_requirements.transform_up(&ensure_sorting)?;
-        if config.optimizer.repartition_sorts {
+        let new_plan = if config.optimizer.repartition_sorts {
             let plan_with_coalesce_partitions =
                 PlanWithCorrespondingCoalescePartitions::new(adjusted.plan);
             let parallel =
                 
plan_with_coalesce_partitions.transform_up(&parallelize_sorts)?;
-            Ok(parallel.plan)
+            parallel.plan
         } else {
-            Ok(adjusted.plan)
-        }
+            adjusted.plan
+        };
+        // Execute a top-down traversal to exploit sort push-down opportunities
+        // missed by the bottom-up traversal:
+        let sort_pushdown = SortPushDown::init(new_plan);
+        let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?;
+        Ok(adjusted.plan)
     }
 
     fn name(&self) -> &str {
@@ -378,77 +379,63 @@ impl PhysicalOptimizerRule for EnforceSorting {
 }
 
 /// This function turns plans of the form
-///      "SortExec: expr=[a@0 ASC]",
+///      "SortExec: expr=\[a@0 ASC\]",
 ///      "  CoalescePartitionsExec",
 ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
 /// to
-///      "SortPreservingMergeExec: [a@0 ASC]",
-///      "  SortExec: expr=[a@0 ASC]",
+///      "SortPreservingMergeExec: \[a@0 ASC\]",
+///      "  SortExec: expr=\[a@0 ASC\]",
 ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-/// by following connections from `CoalescePartitionsExec`s to `SortExec`s.
+/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
 /// By performing sorting in parallel, we can increase performance in some 
scenarios.
 fn parallelize_sorts(
     requirements: PlanWithCorrespondingCoalescePartitions,
 ) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
-    if requirements.plan.children().is_empty() {
-        return Ok(Transformed::No(requirements));
-    }
     let plan = requirements.plan;
     let mut coalesce_onwards = requirements.coalesce_onwards;
-    // We know that `plan` has children, so `coalesce_onwards` is non-empty.
-    if coalesce_onwards[0].is_some() {
-        if (is_sort(&plan) || is_sort_preserving_merge(&plan))
-            // Make sure that Sort is actually global sort
-            && plan.output_partitioning().partition_count() == 1
-        {
-            // If there is a connection between a `CoalescePartitionsExec` and 
a
-            // Global Sort that satisfy the requirements (i.e. intermediate
-            // executors  don't require single partition), then we can
-            // replace the `CoalescePartitionsExec`+ GlobalSort cascade with
-            // the `SortExec` + `SortPreservingMergeExec`
-            // cascade to parallelize sorting.
-            let mut prev_layer = plan.clone();
-            update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
-            let sort_exprs = get_sort_exprs(&plan)?;
-            add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
-            let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), 
prev_layer);
-            return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions 
{
-                plan: Arc::new(spm),
-                coalesce_onwards: vec![None],
-            }));
-        } else if plan.as_any().is::<CoalescePartitionsExec>() {
-            // There is an unnecessary `CoalescePartitionExec` in the plan.
-            let mut prev_layer = plan.clone();
-            update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
-            let new_plan = plan.with_new_children(vec![prev_layer])?;
-            return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions 
{
-                plan: new_plan,
-                coalesce_onwards: vec![None],
-            }));
-        }
+    if plan.children().is_empty() || coalesce_onwards[0].is_none() {
+        // We only take an action when the plan is either a SortExec, a
+        // SortPreservingMergeExec or a CoalescePartitionsExec, and they
+        // all have a single child. Therefore, if the first child is `None`,
+        // we can return immediately.
+        return Ok(Transformed::No(PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards,
+        }));
+    } else if (is_sort(&plan) || is_sort_preserving_merge(&plan))
+        && plan.output_partitioning().partition_count() <= 1
+    {
+        // If there is a connection between a CoalescePartitionsExec and a
+        // global sort that satisfy the requirements (i.e. intermediate
+        // executors don't require single partition), then we can replace
+        // the CoalescePartitionsExec + Sort cascade with a SortExec +
+        // SortPreservingMergeExec cascade to parallelize sorting.
+        let mut prev_layer = plan.clone();
+        update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
+        let sort_exprs = get_sort_exprs(&plan)?;
+        add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
+        let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), 
prev_layer);
+        return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
+            plan: Arc::new(spm),
+            coalesce_onwards: vec![None],
+        }));
+    } else if is_coalesce_partitions(&plan) {
+        // There is an unnecessary `CoalescePartitionExec` in the plan.
+        let mut prev_layer = plan.clone();
+        update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
+        let new_plan = plan.with_new_children(vec![prev_layer])?;
+        return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
+            plan: new_plan,
+            coalesce_onwards: vec![None],
+        }));
     }
+
     Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
         plan,
         coalesce_onwards,
     }))
 }
 
-/// Checks whether the given executor is a limit;
-/// i.e. either a `LocalLimitExec` or a `GlobalLimitExec`.
-fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
-    plan.as_any().is::<GlobalLimitExec>() || 
plan.as_any().is::<LocalLimitExec>()
-}
-
-/// Checks whether the given executor is a `SortExec`.
-fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
-    plan.as_any().is::<SortExec>()
-}
-
-/// Checks whether the given executor is a `SortPreservingMergeExec`.
-fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
-    plan.as_any().is::<SortPreservingMergeExec>()
-}
-
 /// This function enforces sorting requirements and makes optimizations without
 /// violating these requirements whenever possible.
 fn ensure_sorting(
@@ -489,17 +476,6 @@ fn ensure_sorting(
                         *sort_onwards = None;
                     }
                 }
-                if let Some(tree) = sort_onwards {
-                    // For window expressions, we can remove some sorts when 
we can
-                    // calculate the result in reverse:
-                    if plan.as_any().is::<WindowAggExec>()
-                        || plan.as_any().is::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = 
analyze_window_sort_removal(tree, &plan)? {
-                            return Ok(Transformed::Yes(result));
-                        }
-                    }
-                }
             }
             (Some(required), None) => {
                 // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
@@ -509,31 +485,40 @@ fn ensure_sorting(
             }
             (None, Some(_)) => {
                 // We have a `SortExec` whose effect may be neutralized by
-                // another order-imposing operator. Remove or update this sort:
-                if !plan.maintains_input_order()[idx] {
-                    let count = plan.output_ordering().map_or(0, |e| e.len());
-                    if (count > 0) && !is_sort(&plan) {
-                        update_child_to_change_finer_sort(child, sort_onwards, 
count)?;
-                    } else {
-                        update_child_to_remove_unnecessary_sort(
-                            child,
-                            sort_onwards,
-                            &plan,
-                        )?;
-                    }
+                // another order-imposing operator. Remove this sort.
+                if !plan.maintains_input_order()[idx] || is_union(&plan) {
+                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
                 }
             }
             (None, None) => {}
         }
     }
+    // For window expressions, we can remove some sorts when we can
+    // calculate the result in reverse:
+    if is_window(&plan) {
+        if let Some(tree) = &mut sort_onwards[0] {
+            if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
+                return Ok(Transformed::Yes(result));
+            }
+        }
+    } else if is_sort_preserving_merge(&plan)
+        && children[0].output_partitioning().partition_count() <= 1
+    {
+        // This SortPreservingMergeExec is unnecessary, input already has a
+        // single partition.
+        return Ok(Transformed::Yes(PlanWithCorrespondingSort {
+            plan: children[0].clone(),
+            sort_onwards: vec![sort_onwards[0].clone()],
+        }));
+    }
     Ok(Transformed::Yes(PlanWithCorrespondingSort {
         plan: plan.with_new_children(children)?,
         sort_onwards,
     }))
 }
 
-/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
-/// has a finer ordering than this `SortExec` enforces.
+/// Analyzes a given [`SortExec`] (`plan`) to determine whether its input
+/// already has a finer ordering than it enforces.
 fn analyze_immediate_sort_removal(
     plan: &Arc<dyn ExecutionPlan>,
     sort_onwards: &[Option<ExecTree>],
@@ -580,8 +565,8 @@ fn analyze_immediate_sort_removal(
     None
 }
 
-/// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
-/// it may allow removing a sort.
+/// Analyzes a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine
+/// whether it may allow removing a sort.
 fn analyze_window_sort_removal(
     sort_tree: &mut ExecTree,
     window_exec: &Arc<dyn ExecutionPlan>,
@@ -597,6 +582,10 @@ fn analyze_window_sort_removal(
             "Expects to receive either WindowAggExec of 
BoundedWindowAggExec".to_string(),
         ));
     };
+    let n_req = window_exec.required_input_ordering()[0]
+        .as_ref()
+        .map(|elem| elem.len())
+        .unwrap_or(0);
 
     let mut first_should_reverse = None;
     for sort_any in sort_tree.get_leaves() {
@@ -606,14 +595,12 @@ fn analyze_window_sort_removal(
         // Therefore, we can use the 0th index without loss of generality.
         let sort_input = sort_any.children()[0].clone();
         let physical_ordering = sort_input.output_ordering();
-        // TODO: Once we can ensure that required ordering information 
propagates with
-        //       the necessary lineage information, compare 
`physical_ordering` and the
-        //       ordering required by the window executor instead of 
`sort_output_ordering`.
-        //       This will enable us to handle cases such as (a,b) -> Sort -> 
(a,b,c) -> Required(a,b).
-        //       Currently, we can not remove such sorts.
-        let required_ordering = sort_output_ordering.ok_or_else(|| {
+        let sort_output_ordering = sort_output_ordering.ok_or_else(|| {
             DataFusionError::Plan("A SortExec should have output 
ordering".to_string())
         })?;
+        // It is enough to check whether first n_req element of the sort 
output satisfies window_exec requirement.
+        // Because length of window_exec requirement is n_req.
+        let required_ordering = &sort_output_ordering[0..n_req];
         if let Some(physical_ordering) = physical_ordering {
             let (can_skip_sorting, should_reverse) = can_skip_sort(
                 window_expr[0].partition_by(),
@@ -623,8 +610,7 @@ fn analyze_window_sort_removal(
             )?;
             if !can_skip_sorting {
                 return Ok(None);
-            }
-            if let Some(first_should_reverse) = first_should_reverse {
+            } else if let Some(first_should_reverse) = first_should_reverse {
                 if first_should_reverse != should_reverse {
                     return Ok(None);
                 }
@@ -685,7 +671,7 @@ fn update_child_to_remove_coalesce(
     coalesce_onwards: &mut Option<ExecTree>,
 ) -> Result<()> {
     if let Some(coalesce_onwards) = coalesce_onwards {
-        *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards)?;
+        *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards, 
child)?;
     }
     Ok(())
 }
@@ -693,27 +679,29 @@ fn update_child_to_remove_coalesce(
 /// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
 fn remove_corresponding_coalesce_in_sub_plan(
     coalesce_onwards: &mut ExecTree,
+    parent: &Arc<dyn ExecutionPlan>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
-    Ok(
-        if coalesce_onwards
-            .plan
-            .as_any()
-            .is::<CoalescePartitionsExec>()
+    Ok(if is_coalesce_partitions(&coalesce_onwards.plan) {
+        // We can safely use the 0th index since we have a 
`CoalescePartitionsExec`.
+        let mut new_plan = coalesce_onwards.plan.children()[0].clone();
+        while new_plan.output_partitioning() == parent.output_partitioning()
+            && is_repartition(&new_plan)
+            && is_repartition(parent)
         {
-            // We can safely use the 0th index since we have a 
`CoalescePartitionsExec`.
-            coalesce_onwards.plan.children()[0].clone()
-        } else {
-            let plan = coalesce_onwards.plan.clone();
-            let mut children = plan.children();
-            for item in &mut coalesce_onwards.children {
-                children[item.idx] = 
remove_corresponding_coalesce_in_sub_plan(item)?;
-            }
-            plan.with_new_children(children)?
-        },
-    )
+            new_plan = new_plan.children()[0].clone()
+        }
+        new_plan
+    } else {
+        let plan = coalesce_onwards.plan.clone();
+        let mut children = plan.children();
+        for item in &mut coalesce_onwards.children {
+            children[item.idx] = 
remove_corresponding_coalesce_in_sub_plan(item, &plan)?;
+        }
+        plan.with_new_children(children)?
+    })
 }
 
-/// Updates child to remove the unnecessary sorting below it.
+/// Updates child to remove the unnecessary sort below it.
 fn update_child_to_remove_unnecessary_sort(
     child: &mut Arc<dyn ExecutionPlan>,
     sort_onwards: &mut Option<ExecTree>,
@@ -739,8 +727,8 @@ fn remove_corresponding_sort_from_sub_plan(
     requires_single_partition: bool,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     // A `SortExec` is always at the bottom of the tree.
-    if is_sort(&sort_onwards.plan) {
-        Ok(sort_onwards.plan.children()[0].clone())
+    let mut updated_plan = if is_sort(&sort_onwards.plan) {
+        sort_onwards.plan.children()[0].clone()
     } else {
         let plan = &sort_onwards.plan;
         let mut children = plan.children();
@@ -753,62 +741,28 @@ fn remove_corresponding_sort_from_sub_plan(
                 remove_corresponding_sort_from_sub_plan(item, 
requires_single_partition)?;
         }
         if is_sort_preserving_merge(plan) {
-            let child = &children[0];
-            if requires_single_partition
-                && child.output_partitioning().partition_count() > 1
-            {
-                Ok(Arc::new(CoalescePartitionsExec::new(child.clone())))
-            } else {
-                Ok(child.clone())
-            }
+            children[0].clone()
         } else {
-            plan.clone().with_new_children(children)
-        }
-    }
-}
-
-/// Updates child to modify the unnecessarily fine sorting below it.
-fn update_child_to_change_finer_sort(
-    child: &mut Arc<dyn ExecutionPlan>,
-    sort_onwards: &mut Option<ExecTree>,
-    n_sort_expr: usize,
-) -> Result<()> {
-    if let Some(sort_onwards) = sort_onwards {
-        *child = change_finer_sort_in_sub_plan(sort_onwards, n_sort_expr)?;
-    }
-    Ok(())
-}
-
-/// Change the unnecessarily fine sort in `sort_onwards`.
-fn change_finer_sort_in_sub_plan(
-    sort_onwards: &mut ExecTree,
-    n_sort_expr: usize,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    let plan = &sort_onwards.plan;
-    // A `SortExec` is always at the bottom of the tree.
-    if is_sort(plan) {
-        let mut prev_layer = plan.children()[0].clone();
-        let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
-        add_sort_above(&mut prev_layer, new_sort_expr)?;
-        *sort_onwards = ExecTree::new(prev_layer.clone(), sort_onwards.idx, 
vec![]);
-        Ok(prev_layer)
-    } else {
-        let mut children = plan.children();
-        for item in &mut sort_onwards.children {
-            children[item.idx] = change_finer_sort_in_sub_plan(item, 
n_sort_expr)?;
+            plan.clone().with_new_children(children)?
         }
-        if is_sort_preserving_merge(plan) {
-            let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
-            let updated_plan = Arc::new(SortPreservingMergeExec::new(
-                new_sort_expr,
-                children[0].clone(),
-            )) as Arc<dyn ExecutionPlan>;
-            sort_onwards.plan = updated_plan.clone();
-            Ok(updated_plan)
+    };
+    // Deleting a merging sort may invalidate distribution requirements.
+    // Ensure that we stay compliant with such requirements:
+    if requires_single_partition
+        && updated_plan.output_partitioning().partition_count() > 1
+    {
+        // If there is existing ordering, to preserve ordering use 
SortPreservingMergeExec
+        // instead of CoalescePartitionsExec.
+        if let Some(ordering) = updated_plan.output_ordering() {
+            updated_plan = Arc::new(SortPreservingMergeExec::new(
+                ordering.to_vec(),
+                updated_plan,
+            ));
         } else {
-            plan.clone().with_new_children(children)
+            updated_plan = 
Arc::new(CoalescePartitionsExec::new(updated_plan.clone()));
         }
     }
+    Ok(updated_plan)
 }
 
 /// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice 
when possible.
@@ -925,6 +879,9 @@ mod tests {
     use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
+    use crate::physical_plan::joins::utils::JoinOn;
+    use crate::physical_plan::joins::SortMergeJoinExec;
+    use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use crate::physical_plan::memory::MemoryExec;
     use crate::physical_plan::repartition::RepartitionExec;
     use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -935,7 +892,9 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_common::{Result, Statistics};
+    use datafusion_expr::JoinType;
     use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
+    use datafusion_physical_expr::expressions::Column;
     use datafusion_physical_expr::expressions::{col, NotExpr};
     use datafusion_physical_expr::PhysicalSortExpr;
     use std::sync::Arc;
@@ -948,6 +907,13 @@ mod tests {
         Ok(schema)
     }
 
+    fn create_test_schema2() -> Result<SchemaRef> {
+        let col_a = Field::new("col_a", DataType::Int32, true);
+        let col_b = Field::new("col_b", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![col_a, col_b]));
+        Ok(schema)
+    }
+
     // Util function to get string representation of a physical plan
     fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
         let formatted = displayable(plan.as_ref()).indent().to_string();
@@ -1066,6 +1032,7 @@ mod tests {
             // Run the actual optimizer
             let optimized_physical_plan =
                 EnforceSorting::new().optimize(physical_plan, 
state.config_options())?;
+
             // Get string representation of the plan
             let actual = get_plan_string(&optimized_physical_plan);
             assert_eq!(
@@ -1111,7 +1078,7 @@ mod tests {
         )];
         let sort = sort_exec(sort_exprs.clone(), source);
 
-        let window_agg = window_exec("non_nullable_col", sort_exprs, sort);
+        let window_agg = bounded_window_exec("non_nullable_col", sort_exprs, 
sort);
 
         let sort_exprs = vec![sort_expr_options(
             "non_nullable_col",
@@ -1132,14 +1099,13 @@ mod tests {
             sort,
         );
 
-        // let filter_exec = sort_exec;
-        let physical_plan = window_exec("non_nullable_col", sort_exprs, 
filter);
+        let physical_plan = bounded_window_exec("non_nullable_col", 
sort_exprs, filter);
 
         let expected_input = vec![
-            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: 
CurrentRow }]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
             "  FilterExec: NOT non_nullable_col@1",
             "    SortExec: expr=[non_nullable_col@1 ASC NULLS LAST]",
-            "      WindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "      BoundedWindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow }]",
             "        SortExec: expr=[non_nullable_col@1 DESC]",
             "          MemoryExec: partitions=0, partition_sizes=[]",
         ];
@@ -1147,7 +1113,7 @@ mod tests {
         let expected_optimized = vec![
             "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL) }]",
             "  FilterExec: NOT non_nullable_col@1",
-            "    WindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
             "      SortExec: expr=[non_nullable_col@1 DESC]",
             "        MemoryExec: partitions=0, partition_sizes=[]",
         ];
@@ -1169,9 +1135,8 @@ mod tests {
             "  MemoryExec: partitions=0, partition_sizes=[]",
         ];
         let expected_optimized = vec![
-            "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  SortExec: expr=[nullable_col@0 ASC]",
-            "    MemoryExec: partitions=0, partition_sizes=[]",
+            "SortExec: expr=[nullable_col@0 ASC]",
+            "  MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1196,10 +1161,8 @@ mod tests {
             "        MemoryExec: partitions=0, partition_sizes=[]",
         ];
         let expected_optimized = vec![
-            "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "    SortExec: expr=[nullable_col@0 ASC]",
-            "      MemoryExec: partitions=0, partition_sizes=[]",
+            "SortExec: expr=[nullable_col@0 ASC]",
+            "  MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1257,7 +1220,12 @@ mod tests {
             sort_expr("non_nullable_col", &schema),
         ];
         let repartition_exec = repartition_exec(spm);
-        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let sort2 = Arc::new(SortExec::new_with_partitioning(
+            sort_exprs.clone(),
+            repartition_exec,
+            true,
+            None,
+        )) as _;
         let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
 
         let physical_plan = aggregate_exec(spm2);
@@ -1285,6 +1253,95 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort4() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source1 = repartition_exec(memory_exec(&schema));
+
+        let source2 = repartition_exec(memory_exec(&schema));
+        let union = union_exec(vec![source1, source2]);
+
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        // let sort = sort_exec(sort_exprs.clone(), union);
+        let sort = Arc::new(SortExec::new_with_partitioning(
+            sort_exprs.clone(),
+            union,
+            true,
+            None,
+        )) as _;
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let filter = filter_exec(
+            Arc::new(NotExpr::new(
+                col("non_nullable_col", schema.as_ref()).unwrap(),
+            )),
+            spm,
+        );
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let physical_plan = sort_exec(sort_exprs, filter);
+
+        // When removing a `SortPreservingMergeExec`, make sure that 
partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  FilterExec: NOT non_nullable_col@1",
+            "    SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "      SortExec: expr=[non_nullable_col@1 ASC]",
+            "        UnionExec",
+            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    FilterExec: NOT non_nullable_col@1",
+            "      UnionExec",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "          MemoryExec: partitions=0, partition_sizes=[]",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "          MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_spm1() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let input = sort_preserving_merge_exec(
+            vec![sort_expr("non_nullable_col", &schema)],
+            source,
+        );
+        let input2 = sort_preserving_merge_exec(
+            vec![sort_expr("non_nullable_col", &schema)],
+            input,
+        );
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr("nullable_col", 
&schema)], input2);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "    SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        let expected_optimized = vec![
+            "SortExec: expr=[nullable_col@0 ASC]",
+            "  MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_do_not_remove_sort_with_limit() -> Result<()> {
         let schema = create_test_schema()?;
@@ -1295,8 +1352,7 @@ mod tests {
             sort_expr("non_nullable_col", &schema),
         ];
         let sort = sort_exec(sort_exprs.clone(), source1);
-        let limit = local_limit_exec(sort);
-        let limit = global_limit_exec(limit);
+        let limit = limit_exec(sort);
 
         let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
         let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
@@ -1348,9 +1404,35 @@ mod tests {
             "    MemoryExec: partitions=0, partition_sizes=[]",
         ];
         let expected_optimized = vec![
-            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "    MemoryExec: partitions=0, partition_sizes=[]",
+            "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_change_wrong_sorting2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let spm1 = sort_preserving_merge_exec(sort_exprs.clone(), source);
+        let sort2 = sort_exec(vec![sort_exprs[0].clone()], spm1);
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_exprs[1].clone()], sort2);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC]",
+            "    SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        let expected_optimized = vec![
+            "SortExec: expr=[non_nullable_col@1 ASC]",
+            "  MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1432,9 +1514,8 @@ mod tests {
         let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
 
         // Input is an invalid plan. In this case rule should add required 
sorting in appropriate places.
-        // First ParquetExec has output ordering(nullable_col@0 ASC). However, 
it doesn't satisfy required ordering
-        // of SortPreservingMergeExec. Hence rule should remove unnecessary 
sort for second child of the UnionExec
-        // and put a sort above Union to satisfy required ordering.
+        // First ParquetExec has output ordering(nullable_col@0 ASC). However, 
it doesn't satisfy the
+        // required ordering of SortPreservingMergeExec.
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "  UnionExec",
@@ -1442,12 +1523,13 @@ mod tests {
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
-        // should remove unnecessary sorting from below and move it to top
+
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "    UnionExec",
+            "  UnionExec",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1532,10 +1614,12 @@ mod tests {
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "    UnionExec",
+            "  UnionExec",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1636,10 +1720,99 @@ mod tests {
             "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
-            "    SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "      SortExec: expr=[nullable_col@0 ASC]",
-            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted7() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs3 = vec![sort_expr("nullable_col", &schema)];
+        let sort1 = sort_exec(sort_exprs1.clone(), source1.clone());
+        let sort2 = sort_exec(sort_exprs1, source1);
+
+        let union = union_exec(vec![sort1, sort2]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs3, union);
+
+        // Union has unnecessarily fine ordering below it. We should be able 
to replace them with absolutely necessary ordering.
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        // Union preserves the inputs ordering and we should not change any of 
the SortExecs under UnionExec
+        let expected_output = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_output, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted8() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![
+            sort_expr_options(
+                "nullable_col",
+                &schema,
+                SortOptions {
+                    descending: true,
+                    nulls_first: false,
+                },
+            ),
+            sort_expr_options(
+                "non_nullable_col",
+                &schema,
+                SortOptions {
+                    descending: true,
+                    nulls_first: false,
+                },
+            ),
+        ];
+        let sort1 = sort_exec(sort_exprs1, source1.clone());
+        let sort2 = sort_exec(sort_exprs2, source1);
+
+        let physical_plan = union_exec(vec![sort1, sort2]);
+
+        // The `UnionExec` doesn't preserve any of the inputs ordering in the
+        // example below.
+        let expected_input = vec![
+            "UnionExec",
+            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "  SortExec: expr=[nullable_col@0 DESC NULLS 
LAST,non_nullable_col@1 DESC NULLS LAST]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        // Since `UnionExec` doesn't preserve ordering in the plan above.
+        // We shouldn't keep SortExecs in the plan.
+        let expected_optimized = vec![
+            "UnionExec",
+            "  ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "  ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1669,25 +1842,378 @@ mod tests {
         let sort2 = sort_exec(sort_exprs3.clone(), source2);
 
         let union = union_exec(vec![sort1, sort2]);
-        let physical_plan = window_exec("nullable_col", sort_exprs3, union);
+        let spm = sort_preserving_merge_exec(sort_exprs3.clone(), union);
+        let physical_plan = bounded_window_exec("nullable_col", sort_exprs3, 
spm);
 
         // The `WindowAggExec` gets its sorting from multiple children jointly.
         // During the removal of `SortExec`s, it should be able to remove the
         // corresponding SortExecs together. Also, the inputs of these 
`SortExec`s
         // are not necessarily the same to be able to remove them.
         let expected_input = vec![
-            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: 
CurrentRow }]",
-            "  UnionExec",
-            "    SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
+            "    UnionExec",
+            "      SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
+            "      SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL) }]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "    UnionExec",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
-            "    SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
         ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_window_multi_path_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![sort_expr("nullable_col", &schema)];
+        let source1 = parquet_exec_sorted(&schema, sort_exprs2.clone());
+        let source2 = parquet_exec_sorted(&schema, sort_exprs2.clone());
+        let sort1 = sort_exec(sort_exprs1.clone(), source1);
+        let sort2 = sort_exec(sort_exprs1.clone(), source2);
+
+        let union = union_exec(vec![sort1, sort2]);
+        let spm = Arc::new(SortPreservingMergeExec::new(sort_exprs1, union)) 
as _;
+        let physical_plan = bounded_window_exec("nullable_col", sort_exprs2, 
spm);
+
+        // The `WindowAggExec` can get its required sorting from the leaf 
nodes directly.
+        // The unnecessary SortExecs should be removed
+        let expected_input = vec![
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "    UnionExec",
+            "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
         let expected_optimized = vec![
-            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL) }]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr_options(
+                "non_nullable_col",
+                &schema,
+                SortOptions {
+                    descending: true,
+                    nulls_first: false,
+                },
+            ),
+        ];
+        let sort_exprs3 = vec![sort_expr("nullable_col", &schema)];
+        let sort1 = sort_exec(sort_exprs1, source1.clone());
+
+        let sort2 = sort_exec(sort_exprs2, source1);
+        let limit = local_limit_exec(sort2);
+        let limit = global_limit_exec(limit);
+
+        let union = union_exec(vec![sort1, limit]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs3, union);
+
+        // Should not change the unnecessarily fine `SortExec`s because there 
is `LimitExec`
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    GlobalLimitExec: skip=0, fetch=100",
+            "      LocalLimitExec: fetch=100",
+            "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
DESC NULLS LAST]",
+            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    GlobalLimitExec: skip=0, fetch=100",
+            "      LocalLimitExec: fetch=100",
+            "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
DESC NULLS LAST]",
+            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_sort_merge_join_order_by_left() -> Result<()> {
+        let left_schema = create_test_schema()?;
+        let right_schema = create_test_schema2()?;
+
+        let left = parquet_exec(&left_schema);
+        let right = parquet_exec(&right_schema);
+
+        // Join on (nullable_col == col_a)
+        let join_on = vec![(
+            Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+            Column::new_with_schema("col_a", &right.schema()).unwrap(),
+        )];
+
+        let join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::LeftAnti,
+        ];
+        for join_type in join_types {
+            let join =
+                sort_merge_join_exec(left.clone(), right.clone(), &join_on, 
&join_type);
+            let sort_exprs = vec![
+                sort_expr("nullable_col", &join.schema()),
+                sort_expr("non_nullable_col", &join.schema()),
+            ];
+            let physical_plan = sort_preserving_merge_exec(sort_exprs.clone(), 
join);
+
+            let join_plan =
+                format!("SortMergeJoin: join_type={join_type}, on=[(Column {{ 
name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]");
+            let join_plan2 =
+                format!("  SortMergeJoin: join_type={join_type}, on=[(Column 
{{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 
}})]");
+            let expected_input = vec![
+                "SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
+                join_plan2.as_str(),
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+            ];
+            let expected_optimized = match join_type {
+                JoinType::Inner
+                | JoinType::Left
+                | JoinType::LeftSemi
+                | JoinType::LeftAnti => {
+                    // can push down the sort requirements and save 1 SortExec
+                    vec![
+                        join_plan.as_str(),
+                        "  SortExec: expr=[nullable_col@0 
ASC,non_nullable_col@1 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "  SortExec: expr=[col_a@0 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+                _ => {
+                    // can not push down the sort requirements
+                    vec![
+                        "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+                        join_plan2.as_str(),
+                        "    SortExec: expr=[nullable_col@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "    SortExec: expr=[col_a@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+            };
+            assert_optimized!(expected_input, expected_optimized, 
physical_plan);
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_sort_merge_join_order_by_right() -> Result<()> {
+        let left_schema = create_test_schema()?;
+        let right_schema = create_test_schema2()?;
+
+        let left = parquet_exec(&left_schema);
+        let right = parquet_exec(&right_schema);
+
+        // Join on (nullable_col == col_a)
+        let join_on = vec![(
+            Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+            Column::new_with_schema("col_a", &right.schema()).unwrap(),
+        )];
+
+        let join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::RightAnti,
+        ];
+        for join_type in join_types {
+            let join =
+                sort_merge_join_exec(left.clone(), right.clone(), &join_on, 
&join_type);
+            let sort_exprs = vec![
+                sort_expr("col_a", &join.schema()),
+                sort_expr("col_b", &join.schema()),
+            ];
+            let physical_plan = sort_preserving_merge_exec(sort_exprs, join);
+
+            let join_plan =
+                format!("SortMergeJoin: join_type={join_type}, on=[(Column {{ 
name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]");
+            let spm_plan = match join_type {
+                JoinType::RightAnti => {
+                    "SortPreservingMergeExec: [col_a@0 ASC,col_b@1 ASC]"
+                }
+                _ => "SortPreservingMergeExec: [col_a@2 ASC,col_b@3 ASC]",
+            };
+            let join_plan2 =
+                format!("  SortMergeJoin: join_type={join_type}, on=[(Column 
{{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 
}})]");
+            let expected_input = vec![
+                spm_plan,
+                join_plan2.as_str(),
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+            ];
+            let expected_optimized = match join_type {
+                JoinType::Inner | JoinType::Right | JoinType::RightAnti => {
+                    // can push down the sort requirements and save 1 SortExec
+                    vec![
+                        join_plan.as_str(),
+                        "  SortExec: expr=[nullable_col@0 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "  SortExec: expr=[col_a@0 ASC,col_b@1 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+                _ => {
+                    // can not push down the sort requirements for Left and 
Full join.
+                    vec![
+                        "SortExec: expr=[col_a@2 ASC,col_b@3 ASC]",
+                        join_plan2.as_str(),
+                        "    SortExec: expr=[nullable_col@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "    SortExec: expr=[col_a@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+            };
+            assert_optimized!(expected_input, expected_optimized, 
physical_plan);
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_sort_merge_join_complex_order_by() -> Result<()> {
+        let left_schema = create_test_schema()?;
+        let right_schema = create_test_schema2()?;
+
+        let left = parquet_exec(&left_schema);
+        let right = parquet_exec(&right_schema);
+
+        // Join on (nullable_col == col_a)
+        let join_on = vec![(
+            Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+            Column::new_with_schema("col_a", &right.schema()).unwrap(),
+        )];
+
+        let join = sort_merge_join_exec(left, right, &join_on, 
&JoinType::Inner);
+
+        // order by (col_b, col_a)
+        let sort_exprs1 = vec![
+            sort_expr("col_b", &join.schema()),
+            sort_expr("col_a", &join.schema()),
+        ];
+        let physical_plan = sort_preserving_merge_exec(sort_exprs1, 
join.clone());
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [col_b@3 ASC,col_a@2 ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+
+        // can not push down the sort requirements, need to add SortExec
+        let expected_optimized = vec![
+            "SortExec: expr=[col_b@3 ASC,col_a@2 ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[col_a@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+
+        // order by (nullable_col, col_b, col_a)
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &join.schema()),
+            sort_expr("col_b", &join.schema()),
+            sort_expr("col_a", &join.schema()),
+        ];
+        let physical_plan = sort_preserving_merge_exec(sort_exprs2, join);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,col_b@3 ASC,col_a@2 
ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+
+        // can not push down the sort requirements, need to add SortExec
+        let expected_optimized = vec![
+            "SortExec: expr=[nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[col_a@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_multiple_sort_window_exec() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+
+        let sort_exprs1 = vec![sort_expr("nullable_col", &schema)];
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+
+        let sort1 = sort_exec(sort_exprs1.clone(), source);
+        let window_agg1 =
+            bounded_window_exec("non_nullable_col", sort_exprs1.clone(), 
sort1);
+        let window_agg2 =
+            bounded_window_exec("non_nullable_col", sort_exprs2, window_agg1);
+        // let filter_exec = sort_exec;
+        let physical_plan =
+            bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);
+
+        let expected_input = vec![
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "      SortExec: expr=[nullable_col@0 ASC]",
+            "        MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1745,7 +2271,7 @@ mod tests {
 
         let memory_exec = memory_exec(&schema);
         let sort_exprs = vec![sort_expr("nullable_col", &schema)];
-        let window = window_exec("nullable_col", sort_exprs.clone(), 
memory_exec);
+        let window = bounded_window_exec("nullable_col", sort_exprs.clone(), 
memory_exec);
         let repartition = repartition_exec(window);
 
         let orig_plan = Arc::new(SortExec::new_with_partitioning(
@@ -1813,9 +2339,8 @@ mod tests {
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  SortExec: expr=[nullable_col@0 ASC]",
-            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
-            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "        MemoryExec: partitions=0, partition_sizes=[]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1865,7 +2390,7 @@ mod tests {
         Arc::new(FilterExec::try_new(predicate, input).unwrap())
     }
 
-    fn window_exec(
+    fn bounded_window_exec(
         col_name: &str,
         sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
         input: Arc<dyn ExecutionPlan>,
@@ -1874,7 +2399,7 @@ mod tests {
         let schema = input.schema();
 
         Arc::new(
-            WindowAggExec::try_new(
+            BoundedWindowAggExec::try_new(
                 vec![create_window_expr(
                     
&WindowFunction::AggregateFunction(AggregateFunction::Count),
                     "count".to_owned(),
@@ -1940,6 +2465,10 @@ mod tests {
         Arc::new(UnionExec::new(input))
     }
 
+    fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        global_limit_exec(local_limit_exec(input))
+    }
+
     fn local_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
         Arc::new(LocalLimitExec::new(input, 100))
     }
@@ -1967,4 +2496,23 @@ mod tests {
             .unwrap(),
         )
     }
+
+    fn sort_merge_join_exec(
+        left: Arc<dyn ExecutionPlan>,
+        right: Arc<dyn ExecutionPlan>,
+        join_on: &JoinOn,
+        join_type: &JoinType,
+    ) -> Arc<dyn ExecutionPlan> {
+        Arc::new(
+            SortMergeJoinExec::try_new(
+                left,
+                right,
+                join_on.clone(),
+                *join_type,
+                vec![SortOptions::default(); join_on.len()],
+                false,
+            )
+            .unwrap(),
+        )
+    }
 }
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs 
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
new file mode 100644
index 0000000000..07d0002548
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use crate::physical_optimizer::utils::{add_sort_above, is_limit, is_union, 
is_window};
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    make_sort_exprs_from_requirements, ordering_satisfy_requirement,
+    requirements_compatible,
+};
+use datafusion_physical_expr::{
+    make_sort_requirements_from_exprs, PhysicalSortExpr, 
PhysicalSortRequirement,
+};
+use itertools::izip;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This is a "data class" we use within the [`EnforceSorting`] rule to push
+/// down [`SortExec`] in the plan. In some cases, we can reduce the total
+/// computational cost by pushing down `SortExec`s through some executors.
+#[derive(Debug, Clone)]
+pub(crate) struct SortPushDown {
+    /// Current plan
+    pub plan: Arc<dyn ExecutionPlan>,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirement>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, 
but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirement>>>,
+}
+
+impl SortPushDown {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        SortPushDown {
+            plan,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<SortPushDown> {
+        izip!(
+            self.plan.children().into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+        )
+        .map(|(child, from_parent)| {
+            let child_request_ordering = child.required_input_ordering();
+            SortPushDown {
+                plan: child,
+                required_ordering: from_parent,
+                adjusted_request_ordering: child_request_ordering,
+            }
+        })
+        .collect()
+    }
+}
+
+impl TreeNode for SortPushDown {
+    fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
+    where
+        F: FnMut(&Self) -> Result<VisitRecursion>,
+    {
+        let children = self.children();
+        for child in children {
+            match op(&child)? {
+                VisitRecursion::Continue => {}
+                VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
+                VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
+            }
+        }
+
+        Ok(VisitRecursion::Continue)
+    }
+
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if !children.is_empty() {
+            let children_plans = children
+                .into_iter()
+                .map(transform)
+                .map(|r| r.map(|s| s.plan))
+                .collect::<Result<Vec<_>>>()?;
+
+            match with_new_children_if_necessary(self.plan, children_plans)? {
+                Transformed::Yes(plan) | Transformed::No(plan) => {
+                    self.plan = plan;
+                }
+            }
+        };
+        Ok(self)
+    }
+}
+
+pub(crate) fn pushdown_sorts(
+    requirements: SortPushDown,
+) -> Result<Transformed<SortPushDown>> {
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    const ERR_MSG: &str = "Expects parent requirement to contain something";
+    let err = || DataFusionError::Plan(ERR_MSG.to_string());
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let mut new_plan = plan.clone();
+        if !ordering_satisfy_requirement(plan.output_ordering(), 
parent_required, || {
+            plan.equivalence_properties()
+        }) {
+            // If the current plan is a SortExec, modify it to satisfy parent 
requirements:
+            let parent_required_expr =
+                
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
+            new_plan = sort_exec.input.clone();
+            add_sort_above(&mut new_plan, parent_required_expr)?;
+        };
+        let required_ordering = new_plan
+            .output_ordering()
+            .map(make_sort_requirements_from_exprs);
+        // Since new_plan is a SortExec, we can safely get the 0th index.
+        let child = &new_plan.children()[0];
+        if let Some(adjusted) =
+            pushdown_requirement_to_children(child, 
required_ordering.as_deref())?
+        {
+            // Can push down requirements
+            Ok(Transformed::Yes(SortPushDown {
+                plan: child.clone(),
+                required_ordering: None,
+                adjusted_request_ordering: adjusted,
+            }))
+        } else {
+            // Can not push down requirements
+            Ok(Transformed::Yes(SortPushDown::init(new_plan)))
+        }
+    } else {
+        // Executors other than SortExec
+        if ordering_satisfy_requirement(plan.output_ordering(), 
parent_required, || {
+            plan.equivalence_properties()
+        }) {
+            // Satisfies parent requirements, immediately return.
+            return Ok(Transformed::Yes(SortPushDown {
+                required_ordering: None,
+                ..requirements
+            }));
+        }
+        // Can not satisfy the parent requirements, check whether the 
requirements can be pushed down:
+        if let Some(adjusted) = pushdown_requirement_to_children(plan, 
parent_required)? {
+            Ok(Transformed::Yes(SortPushDown {
+                plan: plan.clone(),
+                required_ordering: None,
+                adjusted_request_ordering: adjusted,
+            }))
+        } else {
+            // Can not push down requirements, add new SortExec:
+            let parent_required_expr =
+                
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
+            let mut new_plan = plan.clone();
+            add_sort_above(&mut new_plan, parent_required_expr)?;
+            Ok(Transformed::Yes(SortPushDown::init(new_plan)))
+        }
+    }
+}
+
+fn pushdown_requirement_to_children(
+    plan: &Arc<dyn ExecutionPlan>,
+    parent_required: Option<&[PhysicalSortRequirement]>,
+) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
+    const ERR_MSG: &str = "Expects parent requirement to contain something";
+    let err = || DataFusionError::Plan(ERR_MSG.to_string());
+    let maintains_input_order = plan.maintains_input_order();
+    if is_window(plan) {
+        let required_input_ordering = plan.required_input_ordering();
+        let request_child = required_input_ordering[0].as_deref();
+        let child_plan = plan.children()[0].clone();
+        match determine_children_requirement(parent_required, request_child, 
child_plan) {
+            RequirementsCompatibility::Satisfy => {
+                Ok(Some(vec![request_child.map(|r| r.to_vec())]))
+            }
+            RequirementsCompatibility::Compatible(adjusted) => 
Ok(Some(vec![adjusted])),
+            RequirementsCompatibility::NonCompatible => Ok(None),
+        }
+    } else if is_union(plan) {
+        // UnionExec does not have real sort requirements for its input. Here 
we change the adjusted_request_ordering to UnionExec's output ordering and
+        // propagate the sort requirements down to correct the unnecessary 
descendant SortExec under the UnionExec
+        Ok(Some(vec![
+            parent_required.map(|elem| elem.to_vec());
+            plan.children().len()
+        ]))
+    } else if let Some(smj) = 
plan.as_any().downcast_ref::<SortMergeJoinExec>() {
+        // If the current plan is SortMergeJoinExec
+        let left_columns_len = smj.left.schema().fields().len();
+        let parent_required_expr =
+            
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
+        let expr_source_side =
+            expr_source_sides(&parent_required_expr, smj.join_type, 
left_columns_len);
+        match expr_source_side {
+            Some(JoinSide::Left) if maintains_input_order[0] => {
+                try_pushdown_requirements_to_join(
+                    plan,
+                    parent_required,
+                    parent_required_expr,
+                    JoinSide::Left,
+                )
+            }
+            Some(JoinSide::Right) if maintains_input_order[1] => {
+                let new_right_required = match smj.join_type {
+                    JoinType::Inner | JoinType::Right => shift_right_required(
+                        parent_required.ok_or_else(err)?,
+                        left_columns_len,
+                    )?,
+                    JoinType::RightSemi | JoinType::RightAnti => {
+                        parent_required.ok_or_else(err)?.to_vec()
+                    }
+                    _ => Err(DataFusionError::Plan(
+                        "Unexpected SortMergeJoin type here".to_string(),
+                    ))?,
+                };
+                try_pushdown_requirements_to_join(
+                    plan,
+                    Some(new_right_required.deref()),
+                    parent_required_expr,
+                    JoinSide::Right,
+                )
+            }
+            _ => {
+                // Can not decide the expr side for SortMergeJoinExec, can not 
push down
+                Ok(None)
+            }
+        }
+    } else if maintains_input_order.is_empty()
+        || !maintains_input_order.iter().any(|o| *o)
+        || plan.as_any().is::<RepartitionExec>()
+        || plan.as_any().is::<FilterExec>()
+        // TODO: Add support for Projection push down
+        || plan.as_any().is::<ProjectionExec>()
+        || is_limit(plan)
+    {
+        // If the current plan is a leaf node or can not maintain any of the 
input ordering, can not pushed down requirements.
+        // For RepartitionExec, we always choose to not push down the sort 
requirements even the RepartitionExec(input_partition=1) could maintain input 
ordering.
+        // Pushing down is not beneficial
+        Ok(None)
+    } else {
+        Ok(Some(vec![
+            parent_required.map(|elem| elem.to_vec());
+            plan.children().len()
+        ]))
+    }
+    // TODO: Add support for Projection push down
+}
+
+/// Determine the children requirements
+/// If the children requirements are more specific, do not push down the 
parent requirements
+/// If the the parent requirements are more specific, push down the parent 
requirements
+/// If they are not compatible, need to add Sort.
+fn determine_children_requirement(
+    parent_required: Option<&[PhysicalSortRequirement]>,
+    request_child: Option<&[PhysicalSortRequirement]>,
+    child_plan: Arc<dyn ExecutionPlan>,
+) -> RequirementsCompatibility {
+    if requirements_compatible(request_child, parent_required, || {
+        child_plan.equivalence_properties()
+    }) {
+        // request child requirements are more specific, no need to push down 
the parent requirements
+        RequirementsCompatibility::Satisfy
+    } else if requirements_compatible(parent_required, request_child, || {
+        child_plan.equivalence_properties()
+    }) {
+        // parent requirements are more specific, adjust the request child 
requirements and push down the new requirements
+        let adjusted = parent_required.map(|r| r.to_vec());
+        RequirementsCompatibility::Compatible(adjusted)
+    } else {
+        RequirementsCompatibility::NonCompatible
+    }
+}
+
+fn try_pushdown_requirements_to_join(
+    plan: &Arc<dyn ExecutionPlan>,
+    parent_required: Option<&[PhysicalSortRequirement]>,
+    sort_expr: Vec<PhysicalSortExpr>,
+    push_side: JoinSide,
+) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
+    let child_idx = match push_side {
+        JoinSide::Left => 0,
+        JoinSide::Right => 1,
+    };
+    let required_input_ordering = plan.required_input_ordering();
+    let request_child = required_input_ordering[child_idx].as_deref();
+    let child_plan = plan.children()[child_idx].clone();
+    match determine_children_requirement(parent_required, request_child, 
child_plan) {
+        RequirementsCompatibility::Satisfy => Ok(None),
+        RequirementsCompatibility::Compatible(adjusted) => {
+            let new_adjusted = match push_side {
+                JoinSide::Left => {
+                    vec![adjusted, required_input_ordering[1].clone()]
+                }
+                JoinSide::Right => {
+                    vec![required_input_ordering[0].clone(), adjusted]
+                }
+            };
+            Ok(Some(new_adjusted))
+        }
+        RequirementsCompatibility::NonCompatible => {
+            // Can not push down, add new SortExec
+            add_sort_above(&mut plan.clone(), sort_expr)?;
+            Ok(None)
+        }
+    }
+}
+
+fn expr_source_sides(
+    required_exprs: &[PhysicalSortExpr],
+    join_type: JoinType,
+    left_columns_len: usize,
+) -> Option<JoinSide> {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            let all_column_sides = required_exprs
+                .iter()
+                .filter_map(|r| {
+                    r.expr.as_any().downcast_ref::<Column>().map(|col| {
+                        if col.index() < left_columns_len {
+                            JoinSide::Left
+                        } else {
+                            JoinSide::Right
+                        }
+                    })
+                })
+                .collect::<Vec<_>>();
+
+            // If the exprs are all coming from one side, the requirements can 
be pushed down
+            if all_column_sides.len() != required_exprs.len() {
+                None
+            } else if all_column_sides
+                .iter()
+                .all(|side| matches!(side, JoinSide::Left))
+            {
+                Some(JoinSide::Left)
+            } else if all_column_sides
+                .iter()
+                .all(|side| matches!(side, JoinSide::Right))
+            {
+                Some(JoinSide::Right)
+            } else {
+                None
+            }
+        }
+        JoinType::LeftSemi | JoinType::LeftAnti => required_exprs
+            .iter()
+            .all(|e| e.expr.as_any().downcast_ref::<Column>().is_some())
+            .then_some(JoinSide::Left),
+        JoinType::RightSemi | JoinType::RightAnti => required_exprs
+            .iter()
+            .all(|e| e.expr.as_any().downcast_ref::<Column>().is_some())
+            .then_some(JoinSide::Right),
+    }
+}
+
+fn shift_right_required(
+    parent_required: &[PhysicalSortRequirement],
+    left_columns_len: usize,
+) -> Result<Vec<PhysicalSortRequirement>> {
+    let new_right_required: Vec<PhysicalSortRequirement> = parent_required
+        .iter()
+        .filter_map(|r| {
+            r.expr.as_any().downcast_ref::<Column>().and_then(|col| {
+                (col.index() >= 
left_columns_len).then_some(PhysicalSortRequirement {
+                    expr: Arc::new(Column::new(
+                        col.name(),
+                        col.index() - left_columns_len,
+                    )) as _,
+                    options: r.options,
+                })
+            })
+        })
+        .collect::<Vec<_>>();
+    if new_right_required.len() == parent_required.len() {
+        Ok(new_right_required)
+    } else {
+        Err(DataFusionError::Plan(
+            "Expect to shift all the parent required column indexes for 
SortMergeJoin"
+                .to_string(),
+        ))
+    }
+}
+
+/// Define the Requirements Compatibility
+#[derive(Debug)]
+enum RequirementsCompatibility {
+    /// Requirements satisfy
+    Satisfy,
+    /// Requirements compatible
+    Compatible(Option<Vec<PhysicalSortRequirement>>),
+    /// Requirements not compatible
+    NonCompatible,
+}
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index 30b8243e46..2fa833bb7e 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -21,7 +21,13 @@ use super::optimizer::PhysicalOptimizerRule;
 
 use crate::config::ConfigOptions;
 use crate::error::Result;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort::SortExec;
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 use datafusion_common::tree_node::Transformed;
 use datafusion_physical_expr::utils::ordering_satisfy;
@@ -68,3 +74,40 @@ pub fn add_sort_above(
     }
     Ok(())
 }
+
+/// Checks whether the given operator is a limit;
+/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
+pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<GlobalLimitExec>() || 
plan.as_any().is::<LocalLimitExec>()
+}
+
+/// Checks whether the given operator is a window;
+/// i.e. either a [`WindowAggExec`] or a [`BoundedWindowAggExec`].
+pub fn is_window(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<WindowAggExec>() || 
plan.as_any().is::<BoundedWindowAggExec>()
+}
+
+/// Checks whether the given operator is a [`SortExec`].
+pub fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<SortExec>()
+}
+
+/// Checks whether the given operator is a [`SortPreservingMergeExec`].
+pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<SortPreservingMergeExec>()
+}
+
+/// Checks whether the given operator is a [`CoalescePartitionsExec`].
+pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<CoalescePartitionsExec>()
+}
+
+/// Checks whether the given operator is a [`UnionExec`].
+pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<UnionExec>()
+}
+
+/// Checks whether the given operator is a [`RepartitionExec`].
+pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<RepartitionExec>()
+}
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 623bafe093..94f5c9e5ef 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -35,6 +35,9 @@ use arrow::compute::{concat_batches, take, SortOptions};
 use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
 use arrow::error::ArrowError;
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::{
+    make_sort_requirements_from_exprs, PhysicalSortRequirement,
+};
 use futures::{Stream, StreamExt};
 
 use crate::error::DataFusionError;
@@ -55,9 +58,6 @@ use crate::physical_plan::{
 };
 
 use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_physical_expr::{
-    make_sort_requirements_from_exprs, PhysicalSortRequirement,
-};
 
 /// join execution plan executes partitions in parallel and combines them into 
a set of
 /// partitions.
@@ -249,6 +249,17 @@ impl ExecutionPlan for SortMergeJoinExec {
         self.output_ordering.as_deref()
     }
 
+    fn maintains_input_order(&self) -> Vec<bool> {
+        match self.join_type {
+            JoinType::Inner => vec![true, true],
+            JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => 
vec![true, false],
+            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
+                vec![false, true]
+            }
+            _ => vec![false, false],
+        }
+    }
+
     fn equivalence_properties(&self) -> EquivalenceProperties {
         let left_columns_len = self.left.schema().fields.len();
         combine_join_equivalence_properties(
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index aa6f8361dc..f9a9209454 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -576,7 +576,6 @@ impl DefaultPhysicalPlanner {
                     }
 
                     let logical_input_schema = input.schema();
-
                     let physical_input_schema = input_exec.schema();
                     let window_expr = window_expr
                         .iter()
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 29d5c4dc8f..75598f1d52 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -43,7 +43,6 @@ use datafusion_common::DataFusionError;
 use datafusion_physical_expr::PhysicalSortRequirement;
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
-use log::debug;
 use std::any::Any;
 use std::ops::Range;
 use std::pin::Pin;
@@ -179,10 +178,8 @@ impl ExecutionPlan for WindowAggExec {
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
         if self.partition_keys.is_empty() {
-            debug!("No partition defined for WindowAggExec!!!");
             vec![Distribution::SinglePartition]
         } else {
-            //TODO support PartitionCollections if there is no common 
partition columns in the window_expr
             vec![Distribution::HashPartitioned(self.partition_keys.clone())]
         }
     }
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index cd4ac6ff3d..5c30d523d8 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -120,32 +120,24 @@ pub fn normalize_out_expr_with_alias_schema(
     alias_map: &HashMap<Column, Vec<Column>>,
     schema: &SchemaRef,
 ) -> Arc<dyn PhysicalExpr> {
-    let expr_clone = expr.clone();
-    expr_clone
+    expr.clone()
         .transform(&|expr| {
-            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
-                match expr.as_any().downcast_ref::<Column>() {
-                    Some(column) => {
-                        let out = alias_map
-                            .get(column)
-                            .map(|c| {
-                                let out_col: Arc<dyn PhysicalExpr> =
-                                    Arc::new(c[0].clone());
-                                out_col
-                            })
-                            .or_else(|| match schema.index_of(column.name()) {
-                                // Exactly matching, return None, no need to 
do the transform
-                                Ok(idx) if column.index() == idx => None,
-                                _ => {
-                                    let out_col: Arc<dyn PhysicalExpr> =
-                                        
Arc::new(UnKnownColumn::new(column.name()));
-                                    Some(out_col)
-                                }
-                            });
-                        out
-                    }
-                    None => None,
-                };
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> = match expr
+                .as_any()
+                .downcast_ref::<Column>()
+            {
+                Some(column) => {
+                    alias_map
+                        .get(column)
+                        .map(|c| Arc::new(c[0].clone()) as _)
+                        .or_else(|| match schema.index_of(column.name()) {
+                            // Exactly matching, return None, no need to do 
the transform
+                            Ok(idx) if column.index() == idx => None,
+                            _ => 
Some(Arc::new(UnKnownColumn::new(column.name())) as _),
+                        })
+                }
+                None => None,
+            };
             Ok(if let Some(normalized_form) = normalized_form {
                 Transformed::Yes(normalized_form)
             } else {
@@ -159,8 +151,7 @@ pub fn normalize_expr_with_equivalence_properties(
     expr: Arc<dyn PhysicalExpr>,
     eq_properties: &[EquivalentClass],
 ) -> Arc<dyn PhysicalExpr> {
-    let expr_clone = expr.clone();
-    expr_clone
+    expr.clone()
         .transform(&|expr| {
             let normalized_form: Option<Arc<dyn PhysicalExpr>> =
                 match expr.as_any().downcast_ref::<Column>() {
@@ -235,7 +226,9 @@ pub fn ordering_satisfy<F: FnOnce() -> 
EquivalenceProperties>(
     }
 }
 
-pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
+/// Checks whether the required [`PhysicalSortExpr`]s are satisfied by the
+/// provided [`PhysicalSortExpr`]s.
+fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
     provided: &[PhysicalSortExpr],
     required: &[PhysicalSortExpr],
     equal_properties: F,
@@ -312,6 +305,91 @@ pub fn ordering_satisfy_requirement_concrete<F: FnOnce() 
-> EquivalencePropertie
     }
 }
 
+/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
+/// specific than the provided [`PhysicalSortRequirement`]s.
+pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortRequirement]>,
+    required: Option<&[PhysicalSortRequirement]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            requirements_compatible_concrete(provided, required, 
equal_properties)
+        }
+    }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
+/// specific than the provided [`PhysicalSortRequirement`]s.
+fn requirements_compatible_concrete<F: FnOnce() -> EquivalenceProperties>(
+    provided: &[PhysicalSortRequirement],
+    required: &[PhysicalSortRequirement],
+    equal_properties: F,
+) -> bool {
+    if required.len() > provided.len() {
+        false
+    } else if required
+        .iter()
+        .zip(provided.iter())
+        .all(|(req, given)| given.compatible(req))
+    {
+        true
+    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+        required
+            .iter()
+            .map(|e| {
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
+            })
+            .zip(provided.iter().map(|e| {
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
+            }))
+            .all(|(req, given)| given.compatible(&req))
+    } else {
+        false
+    }
+}
+
+/// This function maps back requirement after ProjectionExec
+/// to the Executor for its input.
+// Specifically, `ProjectionExec` changes index of `Column`s in the schema of 
its input executor.
+// This function changes requirement given according to ProjectionExec schema 
to the requirement
+// according to schema of input executor to the ProjectionExec.
+// For instance, Column{"a", 0} would turn to Column{"a", 1}. Please note that 
this function assumes that
+// name of the Column is unique. If we have a requirement such that 
Column{"a", 0}, Column{"a", 1}.
+// This function will produce incorrect result (It will only emit single 
Column as a result).
+pub fn map_columns_before_projection(
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Vec<Arc<dyn PhysicalExpr>> {
+    let column_mapping = proj_exprs
+        .iter()
+        .filter_map(|(expr, name)| {
+            expr.as_any()
+                .downcast_ref::<Column>()
+                .map(|column| (name.clone(), column.clone()))
+        })
+        .collect::<HashMap<_, _>>();
+    parent_required
+        .iter()
+        .filter_map(|r| {
+            if let Some(column) = r.as_any().downcast_ref::<Column>() {
+                column_mapping.get(column.name())
+            } else {
+                None
+            }
+        })
+        .map(|e| Arc::new(e.clone()) as _)
+        .collect()
+}
+
 /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
 /// for each entry in the input. If required ordering is None for an entry
 /// default ordering `ASC, NULLS LAST` if given.
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 6f9babefe8..b396976186 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -824,11 +824,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let mut assign_map = assignments
             .iter()
             .map(|assign| {
-                let col_name: &Ident = assign
-                    .id
-                    .iter()
-                    .last()
-                    .ok_or(DataFusionError::Plan("Empty column 
id".to_string()))?;
+                let col_name: &Ident = assign.id.iter().last().ok_or_else(|| {
+                    DataFusionError::Plan("Empty column id".to_string())
+                })?;
                 // Validate that the assignment target column exists
                 table_schema.field_with_unqualified_name(&col_name.value)?;
                 Ok((col_name.value.clone(), assign.value.clone()))

Reply via email to