This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new dee9fd7d2 Make EnforceSorting global sort aware, fix sort 
mis-optimizations involving unions, support parallel sort + merge 
transformations (#5171)
dee9fd7d2 is described below

commit dee9fd7d2b9a3dbe57fb88fb9cbe9572f6117ab2
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Feb 9 01:14:29 2023 +0300

    Make EnforceSorting global sort aware, fix sort mis-optimizations involving 
unions, support parallel sort + merge transformations (#5171)
    
    * Make EnforceSorting global sort aware, fix union bug, parallelize CP+Sort 
cascades
    
    * Remove unnecessary cloning
    
    * Convert sort_onward to tree to support multipath
    
    * Add limit bug handling
    
    * minor changes
    
    * Coalesce Partitions converted to tree
    
    * Simplifications
    
    * Add new test
    
    * Simplifications
    
    * Simplifications and refactors
    
    * Update todos
    
    * minor changes
    
    * Add test for union doesn't maintain any of its child ordering
    
    * Add new test
    
    * use corresponding idx instead of 0th index
    
    * Add global limit test
    
    * Add SortPreservingMerge handling
    
    * add finer sorting change Sort and SortPreserve test
    
    * Update test
    
    * simplifications
    
    * Simplifications
    
    * Improved comments and naming
    
    * Remove explicit union check
    
    * make parallelize sort flag config parameter
    
    * go back to first commit, add test
    
    * update config mg
    
    * Use repartition_sorts in the API instead of parallelize_sorts
    
    * Update/format configs.md
    
    * Use maintains_input_order instead of output_ordering comparison
    
    * Move logic under map_children to init method
    
    * Remove two unnecessary clones
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/common/src/config.rs                    |   12 +
 datafusion/core/src/execution/context.rs           |   12 +
 .../src/physical_optimizer/dist_enforcement.rs     |    2 +-
 .../src/physical_optimizer/sort_enforcement.rs     | 1341 ++++++++++++++++----
 .../core/src/physical_optimizer/test_utils.rs      |    7 +-
 datafusion/core/tests/sql/explain_analyze.rs       |    5 -
 datafusion/core/tests/sql/joins.rs                 |   32 +-
 datafusion/core/tests/sql/window.rs                |  188 ++-
 .../test_files/information_schema.slt              |    1 +
 docs/source/user-guide/configs.md                  |   73 +-
 10 files changed, 1386 insertions(+), 287 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 112e9f3e2..6ce3f64ee 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -290,6 +290,18 @@ config_namespace! {
         /// functions in parallel using the provided `target_partitions` level"
         pub repartition_windows: bool, default = true
 
+        /// Should DataFusion execute sorts in a per-partition fashion and 
merge
+        /// afterwards instead of coalescing first and sorting globally
+        /// With this flag is enabled, plans in the form below
+        ///      "SortExec: [a@0 ASC]",
+        ///      "  CoalescePartitionsExec",
+        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+        /// would turn into the plan below which performs better in 
multithreaded environments
+        ///      "SortPreservingMergeExec: [a@0 ASC]",
+        ///      "  SortExec: [a@0 ASC]",
+        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+        pub repartition_sorts: bool, default = true
+
         /// When set to true, the logical plan optimizer will produce warning
         /// messages if any optimization rules produce errors and then proceed 
to the next
         /// rule. When set to false, any rules that produce errors will cause 
the query to fail
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 132d5711a..a053f640f 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1232,6 +1232,12 @@ impl SessionConfig {
         self.options.optimizer.repartition_windows
     }
 
+    /// Do we execute sorts in a per-partition fashion and merge afterwards,
+    /// or do we coalesce partitions first and sort globally?
+    pub fn repartition_sorts(&self) -> bool {
+        self.options.optimizer.repartition_sorts
+    }
+
     /// Are statistics collected during execution?
     pub fn collect_statistics(&self) -> bool {
         self.options.execution.collect_statistics
@@ -1290,6 +1296,12 @@ impl SessionConfig {
         self
     }
 
+    /// Enables or disables the use of per-partition sorting to improve 
parallelism
+    pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
+        self.options.optimizer.repartition_sorts = enabled;
+        self
+    }
+
     /// Enables or disables the use of pruning predicate for parquet readers 
to skip row groups
     pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
         self.options.execution.parquet.pruning = enabled;
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs 
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index c6c2bd40e..0dbba2c31 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1134,7 +1134,7 @@ mod tests {
             //       `EnforceSorting` and `EnfoceDistribution`.
             // TODO: Orthogonalize the tests here just to verify 
`EnforceDistribution` and create
             //       new tests for the cascade.
-            let optimizer = EnforceSorting {};
+            let optimizer = EnforceSorting::new();
             let optimized = optimizer.optimize(optimized, &config)?;
 
             // Now format correctly
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index c9a3c8fec..0eadfb7c2 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -33,16 +33,19 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::add_sort_above_child;
 use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{reverse_sort_options, DataFusionError};
 use datafusion_physical_expr::utils::{ordering_satisfy, 
ordering_satisfy_concrete};
-use datafusion_physical_expr::window::WindowExpr;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use itertools::izip;
+use itertools::{concat, izip};
 use std::iter::zip;
 use std::sync::Arc;
 
@@ -58,15 +61,39 @@ impl EnforceSorting {
     }
 }
 
-/// This is a "data class" we use within the [EnforceSorting] rule that
-/// tracks the closest `SortExec` descendant for every child of a plan.
+/// This object implements a tree that we use while keeping track of paths
+/// leading to `SortExec`s.
+#[derive(Debug, Clone)]
+struct ExecTree {
+    /// Child index of the plan in its parent
+    pub idx: usize,
+    /// Children of the plan that would need updating if we remove leaf 
executors
+    pub children: Vec<ExecTree>,
+    /// The `ExecutionPlan` associated with this node
+    pub plan: Arc<dyn ExecutionPlan>,
+}
+
+impl ExecTree {
+    /// This function returns the executors at the leaves of the tree.
+    fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        if self.children.is_empty() {
+            vec![self.plan.clone()]
+        } else {
+            concat(self.children.iter().map(|e| e.get_leaves()))
+        }
+    }
+}
+
+/// This object is used within the [EnforceSorting] rule to track the closest
+/// `SortExec` descendant(s) for every child of a plan.
 #[derive(Debug, Clone)]
 struct PlanWithCorrespondingSort {
     plan: Arc<dyn ExecutionPlan>,
-    // For every child, keep a vector of `ExecutionPlan`s starting from the
-    // closest `SortExec` till the current plan. The first index of the tuple 
is
-    // the child index of the plan -- we need this information as we make 
updates.
-    sort_onwards: Vec<Vec<(usize, Arc<dyn ExecutionPlan>)>>,
+    // For every child, keep a subtree of `ExecutionPlan`s starting from the
+    // child until the `SortExec`(s) -- could be multiple for n-ary plans like
+    // Union -- that determine the output ordering of the child. If the child
+    // has no connection to any sort, simpliy store None (and not a subtree).
+    sort_onwards: Vec<Option<ExecTree>>,
 }
 
 impl PlanWithCorrespondingSort {
@@ -74,10 +101,76 @@ impl PlanWithCorrespondingSort {
         let length = plan.children().len();
         PlanWithCorrespondingSort {
             plan,
-            sort_onwards: vec![vec![]; length],
+            sort_onwards: vec![None; length],
         }
     }
 
+    pub fn new_from_children_nodes(
+        children_nodes: Vec<PlanWithCorrespondingSort>,
+        parent_plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Self> {
+        let children_plans = children_nodes
+            .iter()
+            .map(|item| item.plan.clone())
+            .collect::<Vec<_>>();
+        let sort_onwards = children_nodes
+            .into_iter()
+            .enumerate()
+            .map(|(idx, item)| {
+                let plan = &item.plan;
+                // Leaves of `sort_onwards` are `SortExec` operators, which 
impose
+                // an ordering. This tree collects all the intermediate 
executors
+                // that maintain this ordering. If we just saw a order imposing
+                // operator, we reset the tree and start accumulating.
+                if is_sort(plan) {
+                    return Some(ExecTree {
+                        idx,
+                        plan: item.plan,
+                        children: vec![],
+                    });
+                } else if is_limit(plan) {
+                    // There is no sort linkage for this path, it starts at a 
limit.
+                    return None;
+                }
+                let is_spm = is_sort_preserving_merge(plan);
+                let is_union = plan.as_any().is::<UnionExec>();
+                // If the executor is a `UnionExec`, and it has an output 
ordering;
+                // then it at least partially maintains some child's output 
ordering.
+                // Therefore, we propagate this information upwards.
+                let partially_maintains = is_union && 
plan.output_ordering().is_some();
+                let required_orderings = plan.required_input_ordering();
+                let flags = plan.maintains_input_order();
+                let children = izip!(flags, item.sort_onwards, 
required_orderings)
+                    .filter_map(|(maintains, element, required_ordering)| {
+                        if (required_ordering.is_none()
+                            && (maintains || partially_maintains))
+                            || is_spm
+                        {
+                            element
+                        } else {
+                            None
+                        }
+                    })
+                    .collect::<Vec<ExecTree>>();
+                if !children.is_empty() {
+                    // Add parent node to the tree if there is at least one
+                    // child with a subtree:
+                    Some(ExecTree {
+                        idx,
+                        plan: item.plan,
+                        children,
+                    })
+                } else {
+                    // There is no sort linkage for this child, do nothing.
+                    None
+                }
+            })
+            .collect();
+
+        let plan = with_new_children_if_necessary(parent_plan, 
children_plans)?;
+        Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+    }
+
     pub fn children(&self) -> Vec<PlanWithCorrespondingSort> {
         self.plan
             .children()
@@ -96,50 +189,147 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
         if children.is_empty() {
             Ok(self)
         } else {
-            let children_requirements = children
+            let children_nodes = children
                 .into_iter()
                 .map(transform)
                 .collect::<Result<Vec<_>>>()?;
-            let children_plans = children_requirements
-                .iter()
-                .map(|elem| elem.plan.clone())
-                .collect::<Vec<_>>();
-            let sort_onwards = children_requirements
-                .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g 
`SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input 
ordering. If we are at
-                        // the beginning both `SortExec` and 
`SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in 
flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && 
!element.is_empty() {
-                                return element.clone();
-                            }
-                        }
+            PlanWithCorrespondingSort::new_from_children_nodes(children_nodes, 
self.plan)
+        }
+    }
+}
+
+/// This object is used within the [EnforceSorting] rule to track the closest
+/// `CoalescePartitionsExec` descendant(s) for every child of a plan.
+#[derive(Debug, Clone)]
+struct PlanWithCorrespondingCoalescePartitions {
+    plan: Arc<dyn ExecutionPlan>,
+    // For every child, keep a subtree of `ExecutionPlan`s starting from the
+    // child until the `CoalescePartitionsExec`(s) -- could be multiple for
+    // n-ary plans like Union -- that affect the output partitioning of the
+    // child. If the child has no connection to any `CoalescePartitionsExec`,
+    // simplify store None (and not a subtree).
+    coalesce_onwards: Vec<Option<ExecTree>>,
+}
+
+impl PlanWithCorrespondingCoalescePartitions {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards: vec![None; length],
+        }
+    }
+
+    pub fn new_from_children_nodes(
+        children_nodes: Vec<PlanWithCorrespondingCoalescePartitions>,
+        parent_plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Self> {
+        let children_plans = children_nodes
+            .iter()
+            .map(|item| item.plan.clone())
+            .collect();
+        let coalesce_onwards = children_nodes
+            .into_iter()
+            .enumerate()
+            .map(|(idx, item)| {
+                // Leaves of the `coalesce_onwards` tree are 
`CoalescePartitionsExec`
+                // operators. This tree collects all the intermediate 
executors that
+                // maintain a single partition. If we just saw a 
`CoalescePartitionsExec`
+                // operator, we reset the tree and start accumulating.
+                let plan = item.plan;
+                if plan.as_any().is::<CoalescePartitionsExec>() {
+                    Some(ExecTree {
+                        idx,
+                        plan,
+                        children: vec![],
+                    })
+                } else if plan.children().is_empty() {
+                    // Plan has no children, there is nothing to propagate.
+                    None
+                } else {
+                    let children = item
+                        .coalesce_onwards
+                        .into_iter()
+                        .flatten()
+                        .filter(|item| {
+                            // Only consider operators that don't require a
+                            // single partition.
+                            !matches!(
+                                plan.required_input_distribution()[item.idx],
+                                Distribution::SinglePartition
+                            )
+                        })
+                        .collect::<Vec<_>>();
+                    if children.is_empty() {
+                        None
+                    } else {
+                        Some(ExecTree {
+                            idx,
+                            plan,
+                            children,
+                        })
                     }
-                    vec![]
-                })
-                .collect::<Vec<_>>();
-            let plan = with_new_children_if_necessary(self.plan, 
children_plans)?;
-            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+                }
+            })
+            .collect();
+        let plan = with_new_children_if_necessary(parent_plan, 
children_plans)?;
+        Ok(PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards,
+        })
+    }
+
+    pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| PlanWithCorrespondingCoalescePartitions::new(child))
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithCorrespondingCoalescePartitions {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_nodes = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            PlanWithCorrespondingCoalescePartitions::new_from_children_nodes(
+                children_nodes,
+                self.plan,
+            )
         }
     }
 }
 
+/// The boolean flag `repartition_sorts` defined in the config indicates
+/// whether we elect to transform CoalescePartitionsExec + SortExec cascades
+/// into SortExec + SortPreservingMergeExec cascades, which enables us to
+/// perform sorting in parallel.
 impl PhysicalOptimizerRule for EnforceSorting {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        // Execute a post-order traversal to adjust input key ordering:
         let plan_requirements = PlanWithCorrespondingSort::new(plan);
         let adjusted = plan_requirements.transform_up(&ensure_sorting)?;
-        Ok(adjusted.plan)
+        if config.optimizer.repartition_sorts {
+            let plan_with_coalesce_partitions =
+                PlanWithCorrespondingCoalescePartitions::new(adjusted.plan);
+            let parallel =
+                
plan_with_coalesce_partitions.transform_up(&parallelize_sorts)?;
+            Ok(parallel.plan)
+        } else {
+            Ok(adjusted.plan)
+        }
     }
 
     fn name(&self) -> &str {
@@ -151,25 +341,98 @@ impl PhysicalOptimizerRule for EnforceSorting {
     }
 }
 
-// Checks whether executor is Sort
-// TODO: Add support for SortPreservingMergeExec also.
+/// This function turns plans of the form
+///      "SortExec: [a@0 ASC]",
+///      "  CoalescePartitionsExec",
+///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+/// to
+///      "SortPreservingMergeExec: [a@0 ASC]",
+///      "  SortExec: [a@0 ASC]",
+///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+/// by following connections from `CoalescePartitionsExec`s to `SortExec`s.
+/// By performing sorting in parallel, we can increase performance in some 
scenarios.
+fn parallelize_sorts(
+    requirements: PlanWithCorrespondingCoalescePartitions,
+) -> Result<Option<PlanWithCorrespondingCoalescePartitions>> {
+    let plan = requirements.plan;
+    if plan.children().is_empty() {
+        return Ok(None);
+    }
+    let mut coalesce_onwards = requirements.coalesce_onwards;
+    // We know that `plan` has children, so `coalesce_onwards` is non-empty.
+    if coalesce_onwards[0].is_some() {
+        if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+            // If there is a connection between a `CoalescePartitionsExec` and 
a
+            // `SortExec` that satisfy the requirements (i.e. they don't 
require a
+            // single partition), then we can replace the 
`CoalescePartitionsExec`
+            // + `SortExec` cascade with a `SortExec` + 
`SortPreservingMergeExec`
+            // cascade to parallelize sorting.
+            let mut prev_layer = plan.clone();
+            update_child_to_change_coalesce(
+                &mut prev_layer,
+                &mut coalesce_onwards[0],
+                Some(sort_exec),
+            )?;
+            let spm = SortPreservingMergeExec::new(sort_exec.expr().to_vec(), 
prev_layer);
+            return Ok(Some(PlanWithCorrespondingCoalescePartitions {
+                plan: Arc::new(spm),
+                coalesce_onwards: vec![None],
+            }));
+        } else if plan.as_any().is::<CoalescePartitionsExec>() {
+            // There is an unnecessary `CoalescePartitionExec` in the plan.
+            let mut prev_layer = plan.clone();
+            update_child_to_change_coalesce(
+                &mut prev_layer,
+                &mut coalesce_onwards[0],
+                None,
+            )?;
+            let new_plan = plan.with_new_children(vec![prev_layer])?;
+            return Ok(Some(PlanWithCorrespondingCoalescePartitions {
+                plan: new_plan,
+                coalesce_onwards: vec![None],
+            }));
+        }
+    }
+    Ok(Some(PlanWithCorrespondingCoalescePartitions {
+        plan,
+        coalesce_onwards,
+    }))
+}
+
+/// Checks whether the given executor is a limit;
+/// i.e. either a `LocalLimitExec` or a `GlobalLimitExec`.
+fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<GlobalLimitExec>() || 
plan.as_any().is::<LocalLimitExec>()
+}
+
+/// Checks whether the given executor is a `SortExec`.
 fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
     plan.as_any().is::<SortExec>()
 }
 
+/// Checks whether the given executor is a `SortPreservingMergeExec`.
+fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.as_any().is::<SortPreservingMergeExec>()
+}
+
+/// This function enforces sorting requirements and makes optimizations without
+/// violating these requirements whenever possible.
 fn ensure_sorting(
     requirements: PlanWithCorrespondingSort,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
     // Perform naive analysis at the beginning -- remove already-satisfied 
sorts:
-    if let Some(result) = analyze_immediate_sort_removal(&requirements)? {
+    let plan = requirements.plan;
+    let mut children = plan.children();
+    if children.is_empty() {
+        return Ok(None);
+    }
+    let mut sort_onwards = requirements.sort_onwards;
+    if let Some(result) = analyze_immediate_sort_removal(&plan, &sort_onwards) 
{
         return Ok(Some(result));
     }
-    let plan = &requirements.plan;
-    let mut new_children = plan.children().clone();
-    let mut new_onwards = requirements.sort_onwards.clone();
     for (idx, (child, sort_onwards, required_ordering)) in izip!(
-        new_children.iter_mut(),
-        new_onwards.iter_mut(),
+        children.iter_mut(),
+        sort_onwards.iter_mut(),
         plan.required_input_ordering()
     )
     .enumerate()
@@ -184,225 +447,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if 
there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = 
sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question 
already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, 
sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when 
we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = 
analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering 
information propagates with
-                    //       necessary lineage information, compare 
`sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) 
-> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec 
to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a 
order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, 
count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = 
requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| 
e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }
+                } else {
+                    // Remove the sort:
+                    PlanWithCorrespondingSort {
+                        plan: sort_input,
+                        sort_onwards: sort_onwards.to_vec(),
+                    }
+                },
+            );
         }
     }
-    Ok(None)
+    None
 }
 
 /// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
 /// it may allow removing a sort.
 fn analyze_window_sort_removal(
-    window_expr: &[Arc<dyn WindowExpr>],
-    partition_keys: &[Arc<dyn PhysicalExpr>],
-    sort_exec: &SortExec,
-    sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    sort_tree: &mut ExecTree,
+    window_exec: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
-    let required_ordering = sort_exec.output_ordering().ok_or_else(|| {
-        DataFusionError::Plan("A SortExec should have output 
ordering".to_string())
-    })?;
-    let physical_ordering = sort_exec.input().output_ordering();
-    let physical_ordering = if let Some(physical_ordering) = physical_ordering 
{
-        physical_ordering
+    let (window_expr, partition_keys) = if let Some(exec) =
+        window_exec.as_any().downcast_ref::<BoundedWindowAggExec>()
+    {
+        (exec.window_expr(), &exec.partition_keys)
+    } else if let Some(exec) = 
window_exec.as_any().downcast_ref::<WindowAggExec>() {
+        (exec.window_expr(), &exec.partition_keys)
     } else {
-        // If there is no physical ordering, there is no way to remove a sort 
-- immediately return:
-        return Ok(None);
+        return Err(DataFusionError::Plan(
+            "Expects to receive either WindowAggExec of 
BoundedWindowAggExec".to_string(),
+        ));
     };
-    let (can_skip_sorting, should_reverse) = can_skip_sort(
-        window_expr[0].partition_by(),
-        required_ordering,
-        &sort_exec.input().schema(),
-        physical_ordering,
-    )?;
-    if can_skip_sorting {
-        let new_window_expr = if should_reverse {
-            window_expr
-                .iter()
-                .map(|e| e.get_reverse_expr())
-                .collect::<Option<Vec<_>>>()
-        } else {
-            Some(window_expr.to_vec())
-        };
-        if let Some(window_expr) = new_window_expr {
-            let new_child = 
remove_corresponding_sort_from_sub_plan(sort_onward)?;
-            let new_schema = new_child.schema();
-
-            let uses_bounded_memory = window_expr.iter().all(|e| 
e.uses_bounded_memory());
-            // If all window exprs can run with bounded memory choose bounded 
window variant
-            let new_plan = if uses_bounded_memory {
-                Arc::new(BoundedWindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
+
+    let mut first_should_reverse = None;
+    let mut physical_ordering_common = vec![];
+    for sort_any in sort_tree.get_leaves() {
+        let sort_output_ordering = sort_any.output_ordering();
+        // Variable `sort_any` will either be a `SortExec` or a
+        // `SortPreservingMergeExec`, and both have a single child.
+        // Therefore, we can use the 0th index without loss of generality.
+        let sort_input = sort_any.children()[0].clone();
+        let physical_ordering = sort_input.output_ordering();
+        // TODO: Once we can ensure that required ordering information 
propagates with
+        //       the necessary lineage information, compare 
`physical_ordering` and the
+        //       ordering required by the window executor instead of 
`sort_output_ordering`.
+        //       This will enable us to handle cases such as (a,b) -> Sort -> 
(a,b,c) -> Required(a,b).
+        //       Currently, we can not remove such sorts.
+        let required_ordering = sort_output_ordering.ok_or_else(|| {
+            DataFusionError::Plan("A SortExec should have output 
ordering".to_string())
+        })?;
+        if let Some(physical_ordering) = physical_ordering {
+            if physical_ordering_common.is_empty()
+                || physical_ordering.len() < physical_ordering_common.len()
+            {
+                physical_ordering_common = physical_ordering.to_vec();
+            }
+            let (can_skip_sorting, should_reverse) = can_skip_sort(
+                window_expr[0].partition_by(),
+                required_ordering,
+                &sort_input.schema(),
+                physical_ordering,
+            )?;
+            if !can_skip_sorting {
+                return Ok(None);
+            }
+            if let Some(first_should_reverse) = first_should_reverse {
+                if first_should_reverse != should_reverse {
+                    return Ok(None);
+                }
             } else {
-                Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
-            };
-            return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+                first_should_reverse = Some(should_reverse);
+            }
+        } else {
+            // If there is no physical ordering, there is no way to remove a
+            // sort, so immediately return.
+            return Ok(None);
         }
     }
+    let new_window_expr = if first_should_reverse.unwrap() {
+        window_expr
+            .iter()
+            .map(|e| e.get_reverse_expr())
+            .collect::<Option<Vec<_>>>()
+    } else {
+        Some(window_expr.to_vec())
+    };
+    if let Some(window_expr) = new_window_expr {
+        let requires_single_partition = matches!(
+            window_exec.required_input_distribution()[sort_tree.idx],
+            Distribution::SinglePartition
+        );
+        let new_child = remove_corresponding_sort_from_sub_plan(
+            sort_tree,
+            requires_single_partition,
+        )?;
+        let new_schema = new_child.schema();
+
+        let uses_bounded_memory = window_expr.iter().all(|e| 
e.uses_bounded_memory());
+        // If all window expressions can run with bounded memory, choose the
+        // bounded window variant:
+        let new_plan = if uses_bounded_memory {
+            Arc::new(BoundedWindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        } else {
+            Arc::new(WindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        };
+        return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+    }
     Ok(None)
 }
 
-/// Updates child to remove the unnecessary sorting below it.
-fn update_child_to_remove_unnecessary_sort(
+/// Updates child to remove the unnecessary `CoalescePartitions` below it.
+fn update_child_to_change_coalesce(
     child: &mut Arc<dyn ExecutionPlan>,
-    sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    coalesce_onwards: &mut Option<ExecTree>,
+    sort_exec: Option<&SortExec>,
 ) -> Result<()> {
-    if !sort_onwards.is_empty() {
-        *child = remove_corresponding_sort_from_sub_plan(sort_onwards)?;
+    if let Some(coalesce_onwards) = coalesce_onwards {
+        *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, 
sort_exec)?;
     }
     Ok(())
 }
 
-/// Converts an [ExecutionPlan] trait object to a [SortExec] when possible.
-fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> 
Result<&SortExec> {
-    sort_any.as_any().downcast_ref::<SortExec>().ok_or_else(|| {
-        DataFusionError::Plan("Given ExecutionPlan is not a 
SortExec".to_string())
-    })
+/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
+fn change_corresponding_coalesce_in_sub_plan(
+    coalesce_onwards: &mut ExecTree,
+    sort_exec: Option<&SortExec>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    Ok(
+        if coalesce_onwards
+            .plan
+            .as_any()
+            .is::<CoalescePartitionsExec>()
+        {
+            // We can safely use the 0th index since we have a 
`CoalescePartitionsExec`.
+            let coalesce_input = coalesce_onwards.plan.children()[0].clone();
+            if let Some(sort_exec) = sort_exec {
+                let sort_expr = sort_exec.expr();
+                if !ordering_satisfy(
+                    coalesce_input.output_ordering(),
+                    Some(sort_expr),
+                    || coalesce_input.equivalence_properties(),
+                ) {
+                    return add_sort_above_child(&coalesce_input, 
sort_expr.to_vec());
+                }
+            }
+            coalesce_input
+        } else {
+            let plan = coalesce_onwards.plan.clone();
+            let mut children = plan.children();
+            for item in &mut coalesce_onwards.children {
+                children[item.idx] =
+                    change_corresponding_coalesce_in_sub_plan(item, 
sort_exec)?;
+            }
+            plan.with_new_children(children)?
+        },
+    )
+}
+
+/// Updates child to remove the unnecessary sorting below it.
+fn update_child_to_remove_unnecessary_sort(
+    child: &mut Arc<dyn ExecutionPlan>,
+    sort_onwards: &mut Option<ExecTree>,
+    parent: &Arc<dyn ExecutionPlan>,
+) -> Result<()> {
+    if let Some(sort_onwards) = sort_onwards {
+        let requires_single_partition = matches!(
+            parent.required_input_distribution()[sort_onwards.idx],
+            Distribution::SinglePartition
+        );
+        *child = remove_corresponding_sort_from_sub_plan(
+            sort_onwards,
+            requires_single_partition,
+        )?;
+    }
+    *sort_onwards = None;
+    Ok(())
 }
 
 /// Removes the sort from the plan in `sort_onwards`.
 fn remove_corresponding_sort_from_sub_plan(
-    sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    sort_onwards: &mut ExecTree,
+    requires_single_partition: bool,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    // A `SortExec` is always at the bottom of the tree.
+    if is_sort(&sort_onwards.plan) {
+        Ok(sort_onwards.plan.children()[0].clone())
+    } else {
+        let plan = &sort_onwards.plan;
+        let mut children = plan.children();
+        for item in &mut sort_onwards.children {
+            let requires_single_partition = matches!(
+                plan.required_input_distribution()[item.idx],
+                Distribution::SinglePartition
+            );
+            children[item.idx] =
+                remove_corresponding_sort_from_sub_plan(item, 
requires_single_partition)?;
+        }
+        if is_sort_preserving_merge(plan) {
+            let child = &children[0];
+            if requires_single_partition
+                && child.output_partitioning().partition_count() > 1
+            {
+                Ok(Arc::new(CoalescePartitionsExec::new(child.clone())))
+            } else {
+                Ok(child.clone())
+            }
+        } else {
+            plan.clone().with_new_children(children)
+        }
+    }
+}
+
+/// Updates child to modify the unnecessarily fine sorting below it.
+fn update_child_to_change_finer_sort(
+    child: &mut Arc<dyn ExecutionPlan>,
+    sort_onwards: &mut Option<ExecTree>,
+    n_sort_expr: usize,
+) -> Result<()> {
+    if let Some(sort_onwards) = sort_onwards {
+        *child = change_finer_sort_in_sub_plan(sort_onwards, n_sort_expr)?;
+    }
+    Ok(())
+}
+
+/// Change the unnecessarily fine sort in `sort_onwards`.
+fn change_finer_sort_in_sub_plan(
+    sort_onwards: &mut ExecTree,
+    n_sort_expr: usize,
 ) -> Result<Arc<dyn ExecutionPlan>> {
-    let (_, sort_any) = sort_onwards[0].clone();
-    let sort_exec = convert_to_sort_exec(&sort_any)?;
-    let mut prev_layer = sort_exec.input().clone();
-    // In the loop below, se start from 1 as the first one is a SortExec
-    // and we are removing it from the plan.
-    for (child_idx, layer) in sort_onwards.iter().skip(1) {
-        let mut children = layer.children();
-        children[*child_idx] = prev_layer;
-        prev_layer = layer.clone().with_new_children(children)?;
-    }
-    // We have removed the sort, hence empty the sort_onwards:
-    sort_onwards.clear();
-    Ok(prev_layer)
+    let plan = &sort_onwards.plan;
+    // A `SortExec` is always at the bottom of the tree.
+    if is_sort(plan) {
+        let prev_layer = plan.children()[0].clone();
+        let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
+        let updated_plan = add_sort_above_child(&prev_layer, new_sort_expr)?;
+        *sort_onwards = ExecTree {
+            idx: sort_onwards.idx,
+            children: vec![],
+            plan: updated_plan.clone(),
+        };
+        Ok(updated_plan)
+    } else {
+        let mut children = plan.children();
+        for item in &mut sort_onwards.children {
+            children[item.idx] = change_finer_sort_in_sub_plan(item, 
n_sort_expr)?;
+        }
+        if is_sort_preserving_merge(plan) {
+            let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
+            let updated_plan = Arc::new(SortPreservingMergeExec::new(
+                new_sort_expr,
+                children[0].clone(),
+            )) as Arc<dyn ExecutionPlan>;
+            sort_onwards.plan = updated_plan.clone();
+            Ok(updated_plan)
+        } else {
+            plan.clone().with_new_children(children)
+        }
+    }
+}
+
+/// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice 
when possible.
+fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> 
Result<&[PhysicalSortExpr]> {
+    if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
+        Ok(sort_exec.expr())
+    } else if let Some(sort_preserving_merge_exec) =
+        sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
+    {
+        Ok(sort_preserving_merge_exec.expr())
+    } else {
+        Err(DataFusionError::Plan(
+            "Given ExecutionPlan is not a SortExec or a 
SortPreservingMergeExec"
+                .to_string(),
+        ))
+    }
 }
 
 #[derive(Debug)]
@@ -498,13 +916,17 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
-    use crate::physical_plan::displayable;
+    use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
+    use crate::physical_plan::aggregates::PhysicalGroupBy;
+    use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::repartition::RepartitionExec;
     use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::union::UnionExec;
     use crate::physical_plan::windows::create_window_expr;
+    use crate::physical_plan::{displayable, Partitioning};
     use crate::prelude::SessionContext;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -522,6 +944,13 @@ mod tests {
         Ok(schema)
     }
 
+    // Util function to get string representation of a physical plan
+    fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
+        let formatted = displayable(plan.as_ref()).indent().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        actual.iter().map(|elem| elem.to_string()).collect()
+    }
+
     #[tokio::test]
     async fn test_is_column_aligned_nullable() -> Result<()> {
         let schema = create_test_schema()?;
@@ -633,11 +1062,8 @@ mod tests {
             // Run the actual optimizer
             let optimized_physical_plan =
                 EnforceSorting::new().optimize(physical_plan, 
state.config_options())?;
-
-            let formatted = displayable(optimized_physical_plan.as_ref())
-                .indent()
-                .to_string();
-            let actual: Vec<&str> = formatted.trim().lines().collect();
+            // Get string representation of the plan
+            let actual = get_plan_string(&optimized_physical_plan);
             assert_eq!(
                 expected_optimized_lines, actual,
                 "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
@@ -775,6 +1201,133 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort2 = sort_exec(sort_exprs.clone(), spm);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort3 = sort_exec(sort_exprs, spm2);
+        let physical_plan = repartition_exec(repartition_exec(sort3));
+
+        let expected_input = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
+            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "            SortExec: [non_nullable_col@1 ASC]",
+            "              MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort3() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition_exec = repartition_exec(spm);
+        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let physical_plan = aggregate_exec(spm2);
+
+        // When removing a `SortPreservingMergeExec`, make sure that 
partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "          SortExec: [non_nullable_col@1 ASC]",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_do_not_remove_sort_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+        let limit = local_limit_exec(sort);
+        let limit = global_limit_exec(limit);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, limit]);
+        let repartition = repartition_exec(union);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, 
repartition);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "      GlobalLimitExec: skip=0, fetch=100",
+            "        LocalLimitExec: fetch=100",
+            "          SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "            ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+
+        // We should keep the bottom `SortExec`.
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+            "      UnionExec",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "        GlobalLimitExec: skip=0, fetch=100",
+            "          LocalLimitExec: fetch=100",
+            "            SortExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "              ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_change_wrong_sorting() -> Result<()> {
         let schema = create_test_schema()?;
@@ -897,6 +1450,332 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted3() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort1 = sort_exec(sort_exprs1, source1.clone());
+        let sort_exprs2 = vec![sort_expr("nullable_col", &schema)];
+        let sort2 = sort_exec(sort_exprs2, source1);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs.clone());
+
+        let union = union_exec(vec![sort1, source2, sort2]);
+        let physical_plan = sort_preserving_merge_exec(parquet_sort_exprs, 
union);
+
+        // First input to the union is not Sorted (SortExec is finer than 
required ordering by the SortPreservingMergeExec above).
+        // Second input to the union is already Sorted (matches with the 
required ordering by the SortPreservingMergeExec above).
+        // Third input to the union is not Sorted (SortExec is matches 
required ordering by the SortPreservingMergeExec above).
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        // should adjust sorting in the first input of the union such that it 
is not unnecessarily fine
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted4() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![sort_expr("nullable_col", &schema)];
+        let sort1 = sort_exec(sort_exprs2.clone(), source1.clone());
+        let sort2 = sort_exec(sort_exprs2.clone(), source1);
+
+        let source2 = parquet_exec_sorted(&schema, sort_exprs2);
+
+        let union = union_exec(vec![sort1, source2, sort2]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs1, union);
+
+        // Ordering requirement of the `SortPreservingMergeExec` is not met.
+        // Should modify the plan to ensure that all three inputs to the
+        // `UnionExec` satisfy the ordering, OR add a single sort after
+        // the `UnionExec` (both of which are equally good for this example).
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted5() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr_options(
+                "non_nullable_col",
+                &schema,
+                SortOptions {
+                    descending: true,
+                    nulls_first: false,
+                },
+            ),
+        ];
+        let sort_exprs3 = vec![sort_expr("nullable_col", &schema)];
+        let sort1 = sort_exec(sort_exprs1, source1.clone());
+        let sort2 = sort_exec(sort_exprs2, source1);
+
+        let union = union_exec(vec![sort1, sort2]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs3, union);
+
+        // The `UnionExec` doesn't preserve any of the inputs ordering in the
+        // example below. However, we should be able to change the 
unnecessarily
+        // fine `SortExec`s below with required `SortExec`s that are 
absolutely necessary.
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 DESC NULLS 
LAST]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted6() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![sort_expr("nullable_col", &schema)];
+        let sort1 = sort_exec(sort_exprs1, source1.clone());
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition = repartition_exec(source1);
+        let spm = sort_preserving_merge_exec(sort_exprs2, repartition);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs.clone());
+
+        let union = union_exec(vec![sort1, source2, spm]);
+        let physical_plan = sort_preserving_merge_exec(parquet_sort_exprs, 
union);
+
+        // The plan is not valid as it is -- the input ordering requirement
+        // of the `SortPreservingMergeExec` under the third child of the
+        // `UnionExec` is not met. We should add a `SortExec` below it.
+        // At the same time, this ordering requirement is unnecessarily fine.
+        // The final plan should be valid AND the ordering of the third child
+        // shouldn't be finer than necessary.
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        // Should adjust the requirement in the third input of the union so
+        // that it is not unnecessarily fine.
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "      SortExec: [nullable_col@0 ASC]",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_window_multi_path_sort() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![sort_expr("nullable_col", &schema)];
+        // reverse sorting of sort_exprs2
+        let sort_exprs3 = vec![sort_expr_options(
+            "nullable_col",
+            &schema,
+            SortOptions {
+                descending: true,
+                nulls_first: false,
+            },
+        )];
+        let source1 = parquet_exec_sorted(&schema, sort_exprs1);
+        let source2 = parquet_exec_sorted(&schema, sort_exprs2);
+        let sort1 = sort_exec(sort_exprs3.clone(), source1);
+        let sort2 = sort_exec(sort_exprs3.clone(), source2);
+
+        let union = union_exec(vec![sort1, sort2]);
+        let physical_plan = window_exec("nullable_col", sort_exprs3, union);
+
+        // The `WindowAggExec` gets its sorting from multiple children jointly.
+        // During the removal of `SortExec`s, it should be able to remove the
+        // corresponding SortExecs together. Also, the inputs of these 
`SortExec`s
+        // are not necessarily the same to be able to remove them.
+        let expected_input = vec![
+            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: 
CurrentRow }]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 DESC NULLS LAST]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: [nullable_col@0 DESC NULLS LAST]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL) }]",
+            "  UnionExec",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_multilayer_coalesce_partitions() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let repartition = repartition_exec(source1);
+        let coalesce = Arc::new(CoalescePartitionsExec::new(repartition)) as _;
+        // Add dummy layer propagating Sort above, to test whether sort can be 
removed from multi layer before
+        let filter = filter_exec(
+            Arc::new(NotExpr::new(
+                col("non_nullable_col", schema.as_ref()).unwrap(),
+            )),
+            coalesce,
+        );
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let physical_plan = sort_exec(sort_exprs, filter);
+
+        // CoalescePartitionsExec and SortExec are not directly consecutive. 
In this case
+        // we should be able to parallelize Sorting also (given that executors 
in between don't require)
+        // single partition.
+        let expected_input = vec![
+            "SortExec: [nullable_col@0 ASC]",
+            "  FilterExec: NOT non_nullable_col@1",
+            "    CoalescePartitionsExec",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  FilterExec: NOT non_nullable_col@1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    // With new change in SortEnforcement 
EnforceSorting->EnforceDistribution->EnforceSorting
+    // should produce same result with EnforceDistribution+EnforceSorting
+    // This enables us to use EnforceSorting possibly before 
EnforceDistribution
+    // Given that it will be called at least once after last 
EnforceDistribution. The reason is that
+    // EnforceDistribution may invalidate ordering invariant.
+    async fn test_commutativity() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+
+        let memory_exec = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let window = window_exec("nullable_col", sort_exprs.clone(), 
memory_exec);
+        let repartition = repartition_exec(window);
+
+        let orig_plan = Arc::new(SortExec::new_with_partitioning(
+            sort_exprs,
+            repartition,
+            false,
+            None,
+        )) as Arc<dyn ExecutionPlan>;
+
+        let mut plan = orig_plan.clone();
+        let rules = vec![
+            Arc::new(EnforceDistribution::new()) as Arc<dyn 
PhysicalOptimizerRule>,
+            Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
+        ];
+        for rule in rules {
+            plan = rule.optimize(plan, state.config_options())?;
+        }
+        let first_plan = plan.clone();
+
+        let mut plan = orig_plan.clone();
+        let rules = vec![
+            Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
+            Arc::new(EnforceDistribution::new()) as Arc<dyn 
PhysicalOptimizerRule>,
+            Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
+        ];
+        for rule in rules {
+            plan = rule.optimize(plan, state.config_options())?;
+        }
+        let second_plan = plan.clone();
+
+        assert_eq!(get_plan_string(&first_plan), 
get_plan_string(&second_plan));
+        Ok(())
+    }
+
     /// make PhysicalSortExpr with default options
     fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
         sort_expr_options(name, schema, SortOptions::default())
@@ -1016,4 +1895,32 @@ mod tests {
     fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn 
ExecutionPlan> {
         Arc::new(UnionExec::new(input))
     }
+
+    fn local_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(LocalLimitExec::new(input, 100))
+    }
+
+    fn global_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(GlobalLimitExec::new(input, 0, Some(100)))
+    }
+
+    fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(
+            RepartitionExec::try_new(input, 
Partitioning::RoundRobinBatch(10)).unwrap(),
+        )
+    }
+
+    fn aggregate_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> 
{
+        let schema = input.schema();
+        Arc::new(
+            AggregateExec::try_new(
+                AggregateMode::Final,
+                PhysicalGroupBy::default(),
+                vec![],
+                input,
+                schema,
+            )
+            .unwrap(),
+        )
+    }
 }
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 1404dfa20..8689b016b 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -118,7 +118,12 @@ impl QueryCase {
         if error.is_some() {
             let plan_error = plan.unwrap_err();
             let initial = error.unwrap().to_string();
-            assert!(plan_error.to_string().contains(initial.as_str()));
+            assert!(
+                plan_error.to_string().contains(initial.as_str()),
+                "plan_error: {:?} doesn't contain message: {:?}",
+                plan_error,
+                initial.as_str()
+            );
         } else {
             assert!(plan.is_ok())
         }
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 01bd94e8e..5fc877a2c 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -86,11 +86,6 @@ async fn explain_analyze_baseline_metrics() {
         "CoalesceBatchesExec: target_batch_size=4096",
         "metrics=[output_rows=5, elapsed_compute"
     );
-    assert_metrics!(
-        &formatted,
-        "CoalescePartitionsExec",
-        "metrics=[output_rows=5, elapsed_compute="
-    );
     assert_metrics!(
         &formatted,
         "UnionExec",
diff --git a/datafusion/core/tests/sql/joins.rs 
b/datafusion/core/tests/sql/joins.rs
index e0bd1a523..37b662a28 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1980,8 +1980,8 @@ async fn left_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=Partitioned, join_type=LeftSemi, 
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 
})]",
@@ -1997,8 +1997,8 @@ async fn left_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=CollectLeft, join_type=LeftSemi, 
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 
})]",
@@ -2062,8 +2062,8 @@ async fn left_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=Partitioned, join_type=LeftSemi, 
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 
})]",
@@ -2078,8 +2078,8 @@ async fn left_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
                 "        CoalesceBatchesExec: target_batch_size=4096",
@@ -2259,8 +2259,8 @@ async fn right_semi_join() -> Result<()> {
         let dataframe = ctx.sql(sql).await.expect(&msg);
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
-            vec![ "SortExec: [t1_id@0 ASC NULLS LAST]",
-                  "  CoalescePartitionsExec",
+            vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                  "  SortExec: [t1_id@0 ASC NULLS LAST]",
                   "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                   "      CoalesceBatchesExec: target_batch_size=4096",
                   "        HashJoinExec: mode=Partitioned, 
join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: 
\"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", 
index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
@@ -2275,8 +2275,8 @@ async fn right_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                 "      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
                 "        CoalesceBatchesExec: target_batch_size=4096",
@@ -2307,8 +2307,8 @@ async fn right_semi_join() -> Result<()> {
         let dataframe = ctx.sql(sql).await.expect(&msg);
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
-            vec![ "SortExec: [t1_id@0 ASC NULLS LAST]",
-                  "  CoalescePartitionsExec",
+            vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                  "  SortExec: [t1_id@0 ASC NULLS LAST]",
                   "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                   "      CoalesceBatchesExec: target_batch_size=4096",
                   "        HashJoinExec: mode=Partitioned, 
join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: 
\"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", 
index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
@@ -2323,8 +2323,8 @@ async fn right_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                 "      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
                 "        CoalesceBatchesExec: target_batch_size=4096",
diff --git a/datafusion/core/tests/sql/window.rs 
b/datafusion/core/tests/sql/window.rs
index da682abb8..21a6062b8 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1747,8 +1747,8 @@ async fn 
over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>
     // 3 SortExec are added
     let expected = {
         vec![
-            "SortExec: [c2@0 ASC NULLS LAST]",
-            "  CoalescePartitionsExec",
+            "SortPreservingMergeExec: [c2@0 ASC NULLS LAST]",
+            "  SortExec: [c2@0 ASC NULLS LAST]",
             "    ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as 
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), 
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
             "      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
             "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field 
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]",
@@ -2298,15 +2298,14 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> 
Result<()> {
             "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
             "    CoalescePartitionsExec",
             "      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]",
-            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CoalescePartitionsExec",
-            "            AggregateExec: mode=FinalPartitioned, gby=[c1@0 as 
c1], aggr=[COUNT(UInt8(1))]",
-            "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 8), input_partitions=8",
-            "                  AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]",
-            "                    CoalesceBatchesExec: target_batch_size=4096",
-            "                      FilterExec: c13@1 != 
C2GT5KVyOPZpgKVl110TyZO0NcJ434",
-            "                        RepartitionExec: 
partitioning=RoundRobinBatch(8), input_partitions=1",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=8",
+            "          AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]",
+            "            CoalesceBatchesExec: target_batch_size=4096",
+            "              RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 8), input_partitions=8",
+            "                AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]",
+            "                  CoalesceBatchesExec: target_batch_size=4096",
+            "                    FilterExec: c13@1 != 
C2GT5KVyOPZpgKVl110TyZO0NcJ434",
+            "                      RepartitionExec: 
partitioning=RoundRobinBatch(8), input_partitions=1",
         ]
     };
 
@@ -2385,6 +2384,173 @@ async fn 
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_repartition_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> 
{
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_repartition_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> 
Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_repartition_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_with_global_limit() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(1);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM 
aggregate_test_100 ORDER BY c13 LIMIT 1)";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as 
array_agg1]",
+            "  AggregateExec: mode=Final, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "    AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "      GlobalLimitExec: skip=0, fetch=1",
+            "        SortExec: [c13@0 ASC NULLS LAST]",
+            "          ProjectionExec: expr=[c13@0 as c13]",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------------+",
+        "| array_agg1                       |",
+        "+----------------------------------+",
+        "| [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm] |",
+        "+----------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn test_window_agg_low_cardinality() -> Result<()> {
     let config = SessionConfig::new().with_target_partitions(32);
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt 
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 4b1c0bc0e..75eca2ddf 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -134,6 +134,7 @@ datafusion.optimizer.repartition_aggregations true
 datafusion.optimizer.repartition_file_min_size 10485760
 datafusion.optimizer.repartition_file_scans false
 datafusion.optimizer.repartition_joins true
+datafusion.optimizer.repartition_sorts true
 datafusion.optimizer.repartition_windows true
 datafusion.optimizer.skip_failed_rules true
 datafusion.optimizer.top_down_join_key_reordering true
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index bd5b69467..ddfbc4e6e 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -35,39 +35,40 @@ Values are parsed according to the [same rules used in 
casts from Utf8](https://
 If the value in the environment variable cannot be cast to the type of the 
configuration option, the default value will be used instead and a warning 
emitted.
 Environment variables are read during `SessionConfig` initialisation so they 
must be set beforehand and will not affect running sessions.
 
-| key                                                       | default    | 
description                                                                     
                                                                                
                                                                                
                                                              |
-| --------------------------------------------------------- | ---------- | 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
-| datafusion.catalog.create_default_catalog_and_schema      | true       | 
Whether the default catalog and schema should be created automatically.         
                                                                                
                                                                                
                                                              |
-| datafusion.catalog.default_catalog                        | datafusion | The 
default catalog name - this impacts what SQL queries use if not specified       
                                                                                
                                                                                
                                                          |
-| datafusion.catalog.default_schema                         | public     | The 
default schema name - this impacts what SQL queries use if not specified        
                                                                                
                                                                                
                                                          |
-| datafusion.catalog.information_schema                     | false      | 
Should DataFusion provide access to `information_schema` virtual tables for 
displaying schema information                                                   
                                                                                
                                                                  |
-| datafusion.catalog.location                               | NULL       | 
Location scanned to load tables for `default` schema                            
                                                                                
                                                                                
                                                              |
-| datafusion.catalog.format                                 | NULL       | 
Type of `TableProvider` to use when loading `default` schema                    
                                                                                
                                                                                
                                                              |
-| datafusion.catalog.has_header                             | false      | If 
the file has a header                                                           
                                                                                
                                                                                
                                                           |
-| datafusion.execution.batch_size                           | 8192       | 
Default batch size while creating new batches, it's especially useful for 
buffer-in-memory batches since creating tiny batches would results in too much 
metadata memory consumption                                                     
                                                                     |
-| datafusion.execution.coalesce_batches                     | true       | 
When set to true, record batches will be examined between each operator and 
small batches will be coalesced into larger batches. This is helpful when there 
are highly selective filters or joins that could produce tiny output batches. 
The target batch size is determined by the configuration setting    |
-| datafusion.execution.collect_statistics                   | false      | 
Should DataFusion collect statistics after listing files                        
                                                                                
                                                                                
                                                              |
-| datafusion.execution.target_partitions                    | 0          | 
Number of partitions for query execution. Increasing partitions can increase 
concurrency. Defaults to the number of cpu cores on the system                  
                                                                                
                                                                 |
-| datafusion.execution.time_zone                            | +00:00     | The 
default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the 
underlying datetime according to this time zone, and then extract the hour      
                                                                                
                                                           |
-| datafusion.execution.parquet.enable_page_index            | false      | If 
true, uses parquet data page level metadata (Page Index) statistics to reduce 
the number of rows decoded.                                                     
                                                                                
                                                             |
-| datafusion.execution.parquet.pruning                      | true       | If 
true, the parquet reader attempts to skip entire row groups based on the 
predicate in the query and the metadata (min/max values) stored in the parquet 
file                                                                            
                                                                   |
-| datafusion.execution.parquet.skip_metadata                | true       | If 
true, the parquet reader skip the optional embedded metadata that may be in the 
file Schema. This setting can help avoid schema conflicts when querying 
multiple parquet files with schemas containing compatible types but different 
metadata                                                             |
-| datafusion.execution.parquet.metadata_size_hint           | NULL       | If 
specified, the parquet reader will try and fetch the last `size_hint` bytes of 
the parquet file optimistically. If not specified, two read are required: One 
read to fetch the 8-byte parquet footer and another to fetch the metadata 
length encoded in the footer                                        |
-| datafusion.execution.parquet.pushdown_filters             | false      | If 
true, filter expressions are be applied during the parquet decoding operation 
to reduce the number of rows decoded                                            
                                                                                
                                                             |
-| datafusion.execution.parquet.reorder_filters              | false      | If 
true, filter expressions evaluated during the parquet decoding operation will 
be reordered heuristically to minimize the cost of evaluation. If false, the 
filters are applied in the same order as written in the query                   
                                                                |
-| datafusion.optimizer.enable_round_robin_repartition       | true       | 
When set to true, the physical plan optimizer will try to add round robin 
repartition to increase parallelism to leverage more CPU cores                  
                                                                                
                                                                    |
-| datafusion.optimizer.filter_null_join_keys                | false      | 
When set to true, the optimizer will insert filters before a join between a 
nullable and non-nullable column to filter out nulls on the nullable side. This 
filter can add additional overhead when the file format does not fully support 
predicate push down.                                               |
-| datafusion.optimizer.repartition_aggregations             | true       | 
Should DataFusion repartition data using the aggregate keys to execute 
aggregates in parallel using the provided `target_partitions` level"            
                                                                                
                                                                       |
-| datafusion.optimizer.repartition_file_min_size            | 10485760   | 
Minimum total files size in bytes to perform file scan repartitioning.          
                                                                                
                                                                                
                                                              |
-| datafusion.optimizer.repartition_joins                    | true       | 
Should DataFusion repartition data using the join keys to execute joins in 
parallel using the provided `target_partitions` level"                          
                                                                                
                                                                   |
-| datafusion.optimizer.repartition_file_scans               | false      | 
When set to true, file groups will be repartitioned to achieve maximum 
parallelism. Currently supported only for Parquet format in which case multiple 
row groups from the same file may be read concurrently. If false then each row 
group is read serially, though different files may be read in parallel. |
-| datafusion.optimizer.repartition_windows                  | true       | 
Should DataFusion repartition data using the partitions keys to execute window 
functions in parallel using the provided `target_partitions` level"             
                                                                                
                                                               |
-| datafusion.optimizer.skip_failed_rules                    | true       | 
When set to true, the logical plan optimizer will produce warning messages if 
any optimization rules produce errors and then proceed to the next rule. When 
set to false, any rules that produce errors will cause the query to fail        
                                                                  |
-| datafusion.optimizer.max_passes                           | 3          | 
Number of times that the optimizer will attempt to optimize the plan            
                                                                                
                                                                                
                                                              |
-| datafusion.optimizer.top_down_join_key_reordering         | true       | 
When set to true, the physical plan optimizer will run a top down process to 
reorder the join keys                                                           
                                                                                
                                                                 |
-| datafusion.optimizer.prefer_hash_join                     | true       | 
When set to true, the physical plan optimizer will prefer HashJoin over 
SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but 
consumes more memory                                                            
                                                                             |
-| datafusion.optimizer.hash_join_single_partition_threshold | 1048576    | The 
maximum estimated size in bytes for one input side of a HashJoin will be 
collected into a single partition                                               
                                                                                
                                                                 |
-| datafusion.explain.logical_plan_only                      | false      | 
When set to true, the explain statement will only print logical plans           
                                                                                
                                                                                
                                                              |
-| datafusion.explain.physical_plan_only                     | false      | 
When set to true, the explain statement will only print physical plans          
                                                                                
                                                                                
                                                              |
-| datafusion.sql_parser.parse_float_as_decimal              | false      | 
When set to true, sql parser will parse float as decimal type                   
                                                                                
                                                                                
                                                              |
-| datafusion.sql_parser.enable_ident_normalization          | true       | 
When set to true, sql parser will normalize ident(convert ident to lowercase 
when not quoted)                                                                
                                                                                
                                                                 |
+| key                                                       | default    | 
description                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| --------------------------------------------------------- | ---------- | 
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| datafusion.catalog.create_default_catalog_and_schema      | true       | 
Whether the default catalog and schema should be created automatically.         
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.catalog.default_catalog                        | datafusion | The 
default catalog name - this impacts what SQL queries use if not specified       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| datafusion.catalog.default_schema                         | public     | The 
default schema name - this impacts what SQL queries use if not specified        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| datafusion.catalog.information_schema                     | false      | 
Should DataFusion provide access to `information_schema` virtual tables for 
displaying schema information                                                   
                                                                                
                                                                                
                                                                                
                      [...]
+| datafusion.catalog.location                               | NULL       | 
Location scanned to load tables for `default` schema                            
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.catalog.format                                 | NULL       | 
Type of `TableProvider` to use when loading `default` schema                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.catalog.has_header                             | false      | If 
the file has a header                                                           
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
+| datafusion.execution.batch_size                           | 8192       | 
Default batch size while creating new batches, it's especially useful for 
buffer-in-memory batches since creating tiny batches would results in too much 
metadata memory consumption                                                     
                                                                                
                                                                                
                         [...]
+| datafusion.execution.coalesce_batches                     | true       | 
When set to true, record batches will be examined between each operator and 
small batches will be coalesced into larger batches. This is helpful when there 
are highly selective filters or joins that could produce tiny output batches. 
The target batch size is determined by the configuration setting                
                                                                                
                        [...]
+| datafusion.execution.collect_statistics                   | false      | 
Should DataFusion collect statistics after listing files                        
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.execution.target_partitions                    | 0          | 
Number of partitions for query execution. Increasing partitions can increase 
concurrency. Defaults to the number of cpu cores on the system                  
                                                                                
                                                                                
                                                                                
                     [...]
+| datafusion.execution.time_zone                            | +00:00     | The 
default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the 
underlying datetime according to this time zone, and then extract the hour      
                                                                                
                                                                                
                                                                                
               [...]
+| datafusion.execution.parquet.enable_page_index            | false      | If 
true, uses parquet data page level metadata (Page Index) statistics to reduce 
the number of rows decoded.                                                     
                                                                                
                                                                                
                                                                                
                 [...]
+| datafusion.execution.parquet.pruning                      | true       | If 
true, the parquet reader attempts to skip entire row groups based on the 
predicate in the query and the metadata (min/max values) stored in the parquet 
file                                                                            
                                                                                
                                                                                
                       [...]
+| datafusion.execution.parquet.skip_metadata                | true       | If 
true, the parquet reader skip the optional embedded metadata that may be in the 
file Schema. This setting can help avoid schema conflicts when querying 
multiple parquet files with schemas containing compatible types but different 
metadata                                                                        
                                                                                
                         [...]
+| datafusion.execution.parquet.metadata_size_hint           | NULL       | If 
specified, the parquet reader will try and fetch the last `size_hint` bytes of 
the parquet file optimistically. If not specified, two read are required: One 
read to fetch the 8-byte parquet footer and another to fetch the metadata 
length encoded in the footer                                                    
                                                                                
                        [...]
+| datafusion.execution.parquet.pushdown_filters             | false      | If 
true, filter expressions are be applied during the parquet decoding operation 
to reduce the number of rows decoded                                            
                                                                                
                                                                                
                                                                                
                 [...]
+| datafusion.execution.parquet.reorder_filters              | false      | If 
true, filter expressions evaluated during the parquet decoding operation will 
be reordered heuristically to minimize the cost of evaluation. If false, the 
filters are applied in the same order as written in the query                   
                                                                                
                                                                                
                    [...]
+| datafusion.optimizer.enable_round_robin_repartition       | true       | 
When set to true, the physical plan optimizer will try to add round robin 
repartition to increase parallelism to leverage more CPU cores                  
                                                                                
                                                                                
                                                                                
                        [...]
+| datafusion.optimizer.filter_null_join_keys                | false      | 
When set to true, the optimizer will insert filters before a join between a 
nullable and non-nullable column to filter out nulls on the nullable side. This 
filter can add additional overhead when the file format does not fully support 
predicate push down.                                                            
                                                                                
                       [...]
+| datafusion.optimizer.repartition_aggregations             | true       | 
Should DataFusion repartition data using the aggregate keys to execute 
aggregates in parallel using the provided `target_partitions` level"            
                                                                                
                                                                                
                                                                                
                           [...]
+| datafusion.optimizer.repartition_file_min_size            | 10485760   | 
Minimum total files size in bytes to perform file scan repartitioning.          
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.optimizer.repartition_joins                    | true       | 
Should DataFusion repartition data using the join keys to execute joins in 
parallel using the provided `target_partitions` level"                          
                                                                                
                                                                                
                                                                                
                       [...]
+| datafusion.optimizer.repartition_file_scans               | false      | 
When set to true, file groups will be repartitioned to achieve maximum 
parallelism. Currently supported only for Parquet format in which case multiple 
row groups from the same file may be read concurrently. If false then each row 
group is read serially, though different files may be read in parallel.         
                                                                                
                            [...]
+| datafusion.optimizer.repartition_windows                  | true       | 
Should DataFusion repartition data using the partitions keys to execute window 
functions in parallel using the provided `target_partitions` level"             
                                                                                
                                                                                
                                                                                
                   [...]
+| datafusion.optimizer.repartition_sorts                    | true       | 
Should DataFusion execute sorts in a per-partition fashion and merge afterwards 
instead of coalescing first and sorting globally With this flag is enabled, 
plans in the form below "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " 
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", would 
turn into the plan below which performs better in multithreaded environments 
"SortPreservingMergeExec: [a@0  [...]
+| datafusion.optimizer.skip_failed_rules                    | true       | 
When set to true, the logical plan optimizer will produce warning messages if 
any optimization rules produce errors and then proceed to the next rule. When 
set to false, any rules that produce errors will cause the query to fail        
                                                                                
                                                                                
                      [...]
+| datafusion.optimizer.max_passes                           | 3          | 
Number of times that the optimizer will attempt to optimize the plan            
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.optimizer.top_down_join_key_reordering         | true       | 
When set to true, the physical plan optimizer will run a top down process to 
reorder the join keys                                                           
                                                                                
                                                                                
                                                                                
                     [...]
+| datafusion.optimizer.prefer_hash_join                     | true       | 
When set to true, the physical plan optimizer will prefer HashJoin over 
SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but 
consumes more memory                                                            
                                                                                
                                                                                
                                 [...]
+| datafusion.optimizer.hash_join_single_partition_threshold | 1048576    | The 
maximum estimated size in bytes for one input side of a HashJoin will be 
collected into a single partition                                               
                                                                                
                                                                                
                                                                                
                     [...]
+| datafusion.explain.logical_plan_only                      | false      | 
When set to true, the explain statement will only print logical plans           
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.explain.physical_plan_only                     | false      | 
When set to true, the explain statement will only print physical plans          
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.sql_parser.parse_float_as_decimal              | false      | 
When set to true, sql parser will parse float as decimal type                   
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| datafusion.sql_parser.enable_ident_normalization          | true       | 
When set to true, sql parser will normalize ident(convert ident to lowercase 
when not quoted)                                                                
                                                                                
                                                                                
                                                                                
                     [...]

Reply via email to