This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6403222c1e TreeNode Refactor Part 2 (#8653)
6403222c1e is described below

commit 6403222c1eda8ed3438fe2555229319b92bfa056
Author: Berkay Şahin <[email protected]>
AuthorDate: Wed Dec 27 23:18:27 2023 +0300

    TreeNode Refactor Part 2 (#8653)
    
    * Refactor TreeNode's
    
    * Update utils.rs
    
    * Final review
    
    * Remove unnecessary clones, more idiomatic Rust
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../src/physical_optimizer/enforce_distribution.rs | 767 +++++++++------------
 .../core/src/physical_optimizer/enforce_sorting.rs | 554 +++++++--------
 .../src/physical_optimizer/output_requirements.rs  |   4 +
 .../src/physical_optimizer/pipeline_checker.rs     |  32 +-
 .../replace_with_order_preserving_variants.rs      | 292 ++++----
 .../core/src/physical_optimizer/sort_pushdown.rs   | 138 ++--
 datafusion/core/src/physical_optimizer/utils.rs    |  69 +-
 datafusion/physical-expr/src/sort_properties.rs    |  10 +-
 datafusion/physical-plan/src/union.rs              |  20 +-
 9 files changed, 872 insertions(+), 1014 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 0aef126578..d5a0862273 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -25,11 +25,11 @@ use std::fmt;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
+use super::output_requirements::OutputRequirementExec;
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::{
-    add_sort_above, get_children_exectrees, is_coalesce_partitions, 
is_repartition,
-    is_sort_preserving_merge, ExecTree,
+    is_coalesce_partitions, is_repartition, is_sort_preserving_merge,
 };
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
@@ -52,8 +52,10 @@ use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::expressions::{Column, NoOp};
 use datafusion_physical_expr::utils::map_columns_before_projection;
 use datafusion_physical_expr::{
-    physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
+    physical_exprs_equal, EquivalenceProperties, LexRequirementRef, 
PhysicalExpr,
+    PhysicalSortRequirement,
 };
+use datafusion_physical_plan::sorts::sort::SortExec;
 use datafusion_physical_plan::windows::{get_best_fitting_window, 
BoundedWindowAggExec};
 use datafusion_physical_plan::{get_plan_string, unbounded_output};
 
@@ -268,11 +270,12 @@ impl PhysicalOptimizerRule for EnforceDistribution {
 /// 5) For other types of operators, by default, pushdown the parent 
requirements to children.
 ///
 fn adjust_input_keys_ordering(
-    requirements: PlanWithKeyRequirements,
+    mut requirements: PlanWithKeyRequirements,
 ) -> Result<Transformed<PlanWithKeyRequirements>> {
     let parent_required = requirements.required_key_ordering.clone();
     let plan_any = requirements.plan.as_any();
-    let transformed = if let Some(HashJoinExec {
+
+    if let Some(HashJoinExec {
         left,
         right,
         on,
@@ -287,7 +290,7 @@ fn adjust_input_keys_ordering(
             PartitionMode::Partitioned => {
                 let join_constructor =
                     |new_conditions: (Vec<(Column, Column)>, 
Vec<SortOptions>)| {
-                        Ok(Arc::new(HashJoinExec::try_new(
+                        HashJoinExec::try_new(
                             left.clone(),
                             right.clone(),
                             new_conditions.0,
@@ -295,15 +298,17 @@ fn adjust_input_keys_ordering(
                             join_type,
                             PartitionMode::Partitioned,
                             *null_equals_null,
-                        )?) as Arc<dyn ExecutionPlan>)
+                        )
+                        .map(|e| Arc::new(e) as _)
                     };
-                Some(reorder_partitioned_join_keys(
+                reorder_partitioned_join_keys(
                     requirements.plan.clone(),
                     &parent_required,
                     on,
                     vec![],
                     &join_constructor,
-                )?)
+                )
+                .map(Transformed::Yes)
             }
             PartitionMode::CollectLeft => {
                 let new_right_request = match join_type {
@@ -321,15 +326,15 @@ fn adjust_input_keys_ordering(
                 };
 
                 // Push down requirements to the right side
-                Some(PlanWithKeyRequirements {
-                    plan: requirements.plan.clone(),
-                    required_key_ordering: vec![],
-                    request_key_ordering: vec![None, new_right_request],
-                })
+                requirements.children[1].required_key_ordering =
+                    new_right_request.unwrap_or(vec![]);
+                Ok(Transformed::Yes(requirements))
             }
             PartitionMode::Auto => {
                 // Can not satisfy, clear the current requirements and 
generate new empty requirements
-                Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+                Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+                    requirements.plan,
+                )))
             }
         }
     } else if let Some(CrossJoinExec { left, .. }) =
@@ -337,14 +342,9 @@ fn adjust_input_keys_ordering(
     {
         let left_columns_len = left.schema().fields().len();
         // Push down requirements to the right side
-        Some(PlanWithKeyRequirements {
-            plan: requirements.plan.clone(),
-            required_key_ordering: vec![],
-            request_key_ordering: vec![
-                None,
-                shift_right_required(&parent_required, left_columns_len),
-            ],
-        })
+        requirements.children[1].required_key_ordering =
+            shift_right_required(&parent_required, 
left_columns_len).unwrap_or_default();
+        Ok(Transformed::Yes(requirements))
     } else if let Some(SortMergeJoinExec {
         left,
         right,
@@ -357,35 +357,40 @@ fn adjust_input_keys_ordering(
     {
         let join_constructor =
             |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
-                Ok(Arc::new(SortMergeJoinExec::try_new(
+                SortMergeJoinExec::try_new(
                     left.clone(),
                     right.clone(),
                     new_conditions.0,
                     *join_type,
                     new_conditions.1,
                     *null_equals_null,
-                )?) as Arc<dyn ExecutionPlan>)
+                )
+                .map(|e| Arc::new(e) as _)
             };
-        Some(reorder_partitioned_join_keys(
+        reorder_partitioned_join_keys(
             requirements.plan.clone(),
             &parent_required,
             on,
             sort_options.clone(),
             &join_constructor,
-        )?)
+        )
+        .map(Transformed::Yes)
     } else if let Some(aggregate_exec) = 
plan_any.downcast_ref::<AggregateExec>() {
         if !parent_required.is_empty() {
             match aggregate_exec.mode() {
-                AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
+                AggregateMode::FinalPartitioned => reorder_aggregate_keys(
                     requirements.plan.clone(),
                     &parent_required,
                     aggregate_exec,
-                )?),
-                _ => 
Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
+                )
+                .map(Transformed::Yes),
+                _ => Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+                    requirements.plan,
+                ))),
             }
         } else {
             // Keep everything unchanged
-            None
+            Ok(Transformed::No(requirements))
         }
     } else if let Some(proj) = plan_any.downcast_ref::<ProjectionExec>() {
         let expr = proj.expr();
@@ -394,34 +399,28 @@ fn adjust_input_keys_ordering(
         // Construct a mapping from new name to the the orginal Column
         let new_required = map_columns_before_projection(&parent_required, 
expr);
         if new_required.len() == parent_required.len() {
-            Some(PlanWithKeyRequirements {
-                plan: requirements.plan.clone(),
-                required_key_ordering: vec![],
-                request_key_ordering: vec![Some(new_required.clone())],
-            })
+            requirements.children[0].required_key_ordering = new_required;
+            Ok(Transformed::Yes(requirements))
         } else {
             // Can not satisfy, clear the current requirements and generate 
new empty requirements
-            Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+            Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+                requirements.plan,
+            )))
         }
     } else if plan_any.downcast_ref::<RepartitionExec>().is_some()
         || plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
         || plan_any.downcast_ref::<WindowAggExec>().is_some()
     {
-        Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+        Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+            requirements.plan,
+        )))
     } else {
         // By default, push down the parent requirements to children
-        let children_len = requirements.plan.children().len();
-        Some(PlanWithKeyRequirements {
-            plan: requirements.plan.clone(),
-            required_key_ordering: vec![],
-            request_key_ordering: vec![Some(parent_required.clone()); 
children_len],
-        })
-    };
-    Ok(if let Some(transformed) = transformed {
-        Transformed::Yes(transformed)
-    } else {
-        Transformed::No(requirements)
-    })
+        requirements.children.iter_mut().for_each(|child| {
+            child.required_key_ordering = parent_required.clone();
+        });
+        Ok(Transformed::Yes(requirements))
+    }
 }
 
 fn reorder_partitioned_join_keys<F>(
@@ -452,28 +451,24 @@ where
             for idx in 0..sort_options.len() {
                 new_sort_options.push(sort_options[new_positions[idx]])
             }
-
-            Ok(PlanWithKeyRequirements {
-                plan: join_constructor((new_join_on, new_sort_options))?,
-                required_key_ordering: vec![],
-                request_key_ordering: vec![Some(left_keys), Some(right_keys)],
-            })
+            let mut requirement_tree = 
PlanWithKeyRequirements::new(join_constructor((
+                new_join_on,
+                new_sort_options,
+            ))?);
+            requirement_tree.children[0].required_key_ordering = left_keys;
+            requirement_tree.children[1].required_key_ordering = right_keys;
+            Ok(requirement_tree)
         } else {
-            Ok(PlanWithKeyRequirements {
-                plan: join_plan,
-                required_key_ordering: vec![],
-                request_key_ordering: vec![Some(left_keys), Some(right_keys)],
-            })
+            let mut requirement_tree = PlanWithKeyRequirements::new(join_plan);
+            requirement_tree.children[0].required_key_ordering = left_keys;
+            requirement_tree.children[1].required_key_ordering = right_keys;
+            Ok(requirement_tree)
         }
     } else {
-        Ok(PlanWithKeyRequirements {
-            plan: join_plan,
-            required_key_ordering: vec![],
-            request_key_ordering: vec![
-                Some(join_key_pairs.left_keys),
-                Some(join_key_pairs.right_keys),
-            ],
-        })
+        let mut requirement_tree = PlanWithKeyRequirements::new(join_plan);
+        requirement_tree.children[0].required_key_ordering = 
join_key_pairs.left_keys;
+        requirement_tree.children[1].required_key_ordering = 
join_key_pairs.right_keys;
+        Ok(requirement_tree)
     }
 }
 
@@ -868,59 +863,24 @@ fn new_join_conditions(
         .collect()
 }
 
-/// Updates `dist_onward` such that, to keep track of
-/// `input` in the `exec_tree`.
-///
-/// # Arguments
-///
-/// * `input`: Current execution plan
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-///    changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-///    until child of `input` (`input` should have single child).
-/// * `input_idx`: index of the `input`, for its parent.
-///
-fn update_distribution_onward(
-    input: Arc<dyn ExecutionPlan>,
-    dist_onward: &mut Option<ExecTree>,
-    input_idx: usize,
-) {
-    // Update the onward tree if there is an active branch
-    if let Some(exec_tree) = dist_onward {
-        // When we add a new operator to change distribution
-        // we add RepartitionExec, SortPreservingMergeExec, 
CoalescePartitionsExec
-        // in this case, we need to update exec tree idx such that exec tree 
is now child of these
-        // operators (change the 0, since all of the operators have single 
child).
-        exec_tree.idx = 0;
-        *exec_tree = ExecTree::new(input, input_idx, vec![exec_tree.clone()]);
-    } else {
-        *dist_onward = Some(ExecTree::new(input, input_idx, vec![]));
-    }
-}
-
 /// Adds RoundRobin repartition operator to the plan increase parallelism.
 ///
 /// # Arguments
 ///
-/// * `input`: Current execution plan
+/// * `input`: Current node.
 /// * `n_target`: desired target partition number, if partition number of the
 ///    current executor is less than this value. Partition number will be 
increased.
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-///    changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-///    until `input` plan.
-/// * `input_idx`: index of the `input`, for its parent.
 ///
 /// # Returns
 ///
-/// A [Result] object that contains new execution plan, where desired 
partition number
-/// is achieved by adding RoundRobin Repartition.
+/// A [`Result`] object that contains new execution plan where the desired
+/// partition number is achieved by adding a RoundRobin repartition.
 fn add_roundrobin_on_top(
-    input: Arc<dyn ExecutionPlan>,
+    input: DistributionContext,
     n_target: usize,
-    dist_onward: &mut Option<ExecTree>,
-    input_idx: usize,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    // Adding repartition is helpful
-    if input.output_partitioning().partition_count() < n_target {
+) -> Result<DistributionContext> {
+    // Adding repartition is helpful:
+    if input.plan.output_partitioning().partition_count() < n_target {
         // When there is an existing ordering, we preserve ordering
         // during repartition. This will be un-done in the future
         // If any of the following conditions is true
@@ -928,13 +888,16 @@ fn add_roundrobin_on_top(
         // - Usage of order preserving variants is not desirable
         // (determined by flag `config.optimizer.prefer_existing_sort`)
         let partitioning = Partitioning::RoundRobinBatch(n_target);
-        let repartition =
-            RepartitionExec::try_new(input, 
partitioning)?.with_preserve_order();
+        let repartition = RepartitionExec::try_new(input.plan.clone(), 
partitioning)?
+            .with_preserve_order();
 
-        // update distribution onward with new operator
-        let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
-        update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
-        Ok(new_plan)
+        let new_plan = Arc::new(repartition) as _;
+
+        Ok(DistributionContext {
+            plan: new_plan,
+            distribution_connection: true,
+            children_nodes: vec![input],
+        })
     } else {
         // Partition is not helpful, we already have desired number of 
partitions.
         Ok(input)
@@ -948,46 +911,38 @@ fn add_roundrobin_on_top(
 ///
 /// # Arguments
 ///
-/// * `input`: Current execution plan
+/// * `input`: Current node.
 /// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
 /// * `n_target`: desired target partition number, if partition number of the
 ///    current executor is less than this value. Partition number will be 
increased.
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-///    changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-///    until `input` plan.
-/// * `input_idx`: index of the `input`, for its parent.
 ///
 /// # Returns
 ///
-/// A [`Result`] object that contains new execution plan, where desired 
distribution is
-/// satisfied by adding Hash Repartition.
+/// A [`Result`] object that contains new execution plan where the desired
+/// distribution is satisfied by adding a Hash repartition.
 fn add_hash_on_top(
-    input: Arc<dyn ExecutionPlan>,
+    mut input: DistributionContext,
     hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
-    // Repartition(Hash) will have `n_target` partitions at the output.
     n_target: usize,
-    // Stores executors starting from Repartition(RoundRobin) until
-    // current executor. When Repartition(Hash) is added, `dist_onward`
-    // is updated such that it stores connection from Repartition(RoundRobin)
-    // until Repartition(Hash).
-    dist_onward: &mut Option<ExecTree>,
-    input_idx: usize,
     repartition_beneficial_stats: bool,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    if n_target == input.output_partitioning().partition_count() && n_target 
== 1 {
-        // In this case adding a hash repartition is unnecessary as the hash
-        // requirement is implicitly satisfied.
+) -> Result<DistributionContext> {
+    let partition_count = input.plan.output_partitioning().partition_count();
+    // Early return if hash repartition is unnecessary
+    if n_target == partition_count && n_target == 1 {
         return Ok(input);
     }
+
     let satisfied = input
+        .plan
         .output_partitioning()
         .satisfy(Distribution::HashPartitioned(hash_exprs.clone()), || {
-            input.equivalence_properties()
+            input.plan.equivalence_properties()
         });
+
     // Add hash repartitioning when:
     // - The hash distribution requirement is not satisfied, or
     // - We can increase parallelism by adding hash partitioning.
-    if !satisfied || n_target > input.output_partitioning().partition_count() {
+    if !satisfied || n_target > 
input.plan.output_partitioning().partition_count() {
         // When there is an existing ordering, we preserve ordering during
         // repartition. This will be rolled back in the future if any of the
         // following conditions is true:
@@ -995,75 +950,66 @@ fn add_hash_on_top(
         //   requirements.
         // - Usage of order preserving variants is not desirable (per the flag
         //   `config.optimizer.prefer_existing_sort`).
-        let mut new_plan = if repartition_beneficial_stats {
+        if repartition_beneficial_stats {
             // Since hashing benefits from partitioning, add a round-robin 
repartition
             // before it:
-            add_roundrobin_on_top(input, n_target, dist_onward, 0)?
-        } else {
-            input
-        };
+            input = add_roundrobin_on_top(input, n_target)?;
+        }
+
         let partitioning = Partitioning::Hash(hash_exprs, n_target);
-        let repartition = RepartitionExec::try_new(new_plan, partitioning)?
-            // preserve any ordering if possible
+        let repartition = RepartitionExec::try_new(input.plan.clone(), 
partitioning)?
             .with_preserve_order();
-        new_plan = Arc::new(repartition) as _;
 
-        // update distribution onward with new operator
-        update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
-        Ok(new_plan)
-    } else {
-        Ok(input)
+        input.children_nodes = vec![input.clone()];
+        input.distribution_connection = true;
+        input.plan = Arc::new(repartition) as _;
     }
+
+    Ok(input)
 }
 
-/// Adds a `SortPreservingMergeExec` operator on top of input executor:
-/// - to satisfy single distribution requirement.
+/// Adds a [`SortPreservingMergeExec`] operator on top of input executor
+/// to satisfy single distribution requirement.
 ///
 /// # Arguments
 ///
-/// * `input`: Current execution plan
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-///    changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-///    until `input` plan.
-/// * `input_idx`: index of the `input`, for its parent.
+/// * `input`: Current node.
 ///
 /// # Returns
 ///
-/// New execution plan, where desired single
-/// distribution is satisfied by adding `SortPreservingMergeExec`.
-fn add_spm_on_top(
-    input: Arc<dyn ExecutionPlan>,
-    dist_onward: &mut Option<ExecTree>,
-    input_idx: usize,
-) -> Arc<dyn ExecutionPlan> {
+/// Updated node with an execution plan, where desired single
+/// distribution is satisfied by adding [`SortPreservingMergeExec`].
+fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
     // Add SortPreservingMerge only when partition count is larger than 1.
-    if input.output_partitioning().partition_count() > 1 {
+    if input.plan.output_partitioning().partition_count() > 1 {
         // When there is an existing ordering, we preserve ordering
-        // during decreasıng partıtıons. This will be un-done in the future
-        // If any of the following conditions is true
+        // when decreasing partitions. This will be un-done in the future
+        // if any of the following conditions is true
         // - Preserving ordering is not helpful in terms of satisfying 
ordering requirements
         // - Usage of order preserving variants is not desirable
-        // (determined by flag `config.optimizer.prefer_existing_sort`)
-        let should_preserve_ordering = input.output_ordering().is_some();
-        let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
-            let existing_ordering = input.output_ordering().unwrap_or(&[]);
+        // (determined by flag 
`config.optimizer.bounded_order_preserving_variants`)
+        let should_preserve_ordering = input.plan.output_ordering().is_some();
+
+        let new_plan = if should_preserve_ordering {
             Arc::new(SortPreservingMergeExec::new(
-                existing_ordering.to_vec(),
-                input,
+                input.plan.output_ordering().unwrap_or(&[]).to_vec(),
+                input.plan.clone(),
             )) as _
         } else {
-            Arc::new(CoalescePartitionsExec::new(input)) as _
+            Arc::new(CoalescePartitionsExec::new(input.plan.clone())) as _
         };
 
-        // update repartition onward with new operator
-        update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
-        new_plan
+        DistributionContext {
+            plan: new_plan,
+            distribution_connection: true,
+            children_nodes: vec![input],
+        }
     } else {
         input
     }
 }
 
-/// Updates the physical plan inside `distribution_context` so that 
distribution
+/// Updates the physical plan inside [`DistributionContext`] so that 
distribution
 /// changing operators are removed from the top. If they are necessary, they 
will
 /// be added in subsequent stages.
 ///
@@ -1081,48 +1027,23 @@ fn add_spm_on_top(
 /// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, 
d, e], output_ordering=\[a@0 ASC]",
 /// ```
 fn remove_dist_changing_operators(
-    distribution_context: DistributionContext,
+    mut distribution_context: DistributionContext,
 ) -> Result<DistributionContext> {
-    let DistributionContext {
-        mut plan,
-        mut distribution_onwards,
-    } = distribution_context;
-
-    // Remove any distribution changing operators at the beginning:
-    // Note that they will be re-inserted later on if necessary or helpful.
-    while is_repartition(&plan)
-        || is_coalesce_partitions(&plan)
-        || is_sort_preserving_merge(&plan)
+    while is_repartition(&distribution_context.plan)
+        || is_coalesce_partitions(&distribution_context.plan)
+        || is_sort_preserving_merge(&distribution_context.plan)
     {
-        // All of above operators have a single child. When we remove the top
-        // operator, we take the first child.
-        plan = plan.children().swap_remove(0);
-        distribution_onwards =
-            get_children_exectrees(plan.children().len(), 
&distribution_onwards[0]);
+        // All of above operators have a single child. First child is only 
child.
+        let child = distribution_context.children_nodes.swap_remove(0);
+        // Remove any distribution changing operators at the beginning:
+        // Note that they will be re-inserted later on if necessary or helpful.
+        distribution_context = child;
     }
 
-    // Create a plan with the updated children:
-    Ok(DistributionContext {
-        plan,
-        distribution_onwards,
-    })
+    Ok(distribution_context)
 }
 
-/// Updates the physical plan `input` by using `dist_onward` replace order 
preserving operator variants
-/// with their corresponding operators that do not preserve order. It is a 
wrapper for `replace_order_preserving_variants_helper`
-fn replace_order_preserving_variants(
-    input: &mut Arc<dyn ExecutionPlan>,
-    dist_onward: &mut Option<ExecTree>,
-) -> Result<()> {
-    if let Some(dist_onward) = dist_onward {
-        *input = replace_order_preserving_variants_helper(dist_onward)?;
-    }
-    *dist_onward = None;
-    Ok(())
-}
-
-/// Updates the physical plan inside `ExecTree` if preserving ordering while 
changing partitioning
-/// is not helpful or desirable.
+/// Updates the [`DistributionContext`] if preserving ordering while changing 
partitioning is not helpful or desirable.
 ///
 /// Assume that following plan is given:
 /// ```text
@@ -1132,7 +1053,7 @@ fn replace_order_preserving_variants(
 /// "      ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, 
b, c, d, e], output_ordering=\[a@0 ASC]",
 /// ```
 ///
-/// This function converts plan above (inside `ExecTree`) to the following:
+/// This function converts plan above to the following:
 ///
 /// ```text
 /// "CoalescePartitionsExec"
@@ -1140,30 +1061,75 @@ fn replace_order_preserving_variants(
 /// "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
 /// "      ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, 
b, c, d, e], output_ordering=\[a@0 ASC]",
 /// ```
-fn replace_order_preserving_variants_helper(
-    exec_tree: &ExecTree,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    let mut updated_children = exec_tree.plan.children();
-    for child in &exec_tree.children {
-        updated_children[child.idx] = 
replace_order_preserving_variants_helper(child)?;
-    }
-    if is_sort_preserving_merge(&exec_tree.plan) {
-        return Ok(Arc::new(CoalescePartitionsExec::new(
-            updated_children.swap_remove(0),
-        )));
-    }
-    if let Some(repartition) = 
exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
+fn replace_order_preserving_variants(
+    mut context: DistributionContext,
+) -> Result<DistributionContext> {
+    let mut updated_children = context
+        .children_nodes
+        .iter()
+        .map(|child| {
+            if child.distribution_connection {
+                replace_order_preserving_variants(child.clone())
+            } else {
+                Ok(child.clone())
+            }
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    if is_sort_preserving_merge(&context.plan) {
+        let child = updated_children.swap_remove(0);
+        context.plan = 
Arc::new(CoalescePartitionsExec::new(child.plan.clone()));
+        context.children_nodes = vec![child];
+        return Ok(context);
+    } else if let Some(repartition) =
+        context.plan.as_any().downcast_ref::<RepartitionExec>()
+    {
         if repartition.preserve_order() {
-            return Ok(Arc::new(
-                // new RepartitionExec don't preserve order
-                RepartitionExec::try_new(
-                    updated_children.swap_remove(0),
-                    repartition.partitioning().clone(),
-                )?,
-            ));
+            let child = updated_children.swap_remove(0);
+            context.plan = Arc::new(RepartitionExec::try_new(
+                child.plan.clone(),
+                repartition.partitioning().clone(),
+            )?);
+            context.children_nodes = vec![child];
+            return Ok(context);
+        }
+    }
+
+    context.plan = context
+        .plan
+        .clone()
+        .with_new_children(updated_children.into_iter().map(|c| 
c.plan).collect())?;
+    Ok(context)
+}
+
+/// This utility function adds a [`SortExec`] above an operator according to 
the
+/// given ordering requirements while preserving the original partitioning.
+fn add_sort_preserving_partitions(
+    node: DistributionContext,
+    sort_requirement: LexRequirementRef,
+    fetch: Option<usize>,
+) -> DistributionContext {
+    // If the ordering requirement is already satisfied, do not add a sort.
+    if !node
+        .plan
+        .equivalence_properties()
+        .ordering_satisfy_requirement(sort_requirement)
+    {
+        let sort_expr = 
PhysicalSortRequirement::to_sort_exprs(sort_requirement.to_vec());
+        let new_sort = SortExec::new(sort_expr, 
node.plan.clone()).with_fetch(fetch);
+
+        DistributionContext {
+            plan: Arc::new(if 
node.plan.output_partitioning().partition_count() > 1 {
+                new_sort.with_preserve_partitioning(true)
+            } else {
+                new_sort
+            }),
+            distribution_connection: false,
+            children_nodes: vec![node],
         }
+    } else {
+        node
     }
-    exec_tree.plan.clone().with_new_children(updated_children)
 }
 
 /// This function checks whether we need to add additional data exchange
@@ -1174,6 +1140,12 @@ fn ensure_distribution(
     dist_context: DistributionContext,
     config: &ConfigOptions,
 ) -> Result<Transformed<DistributionContext>> {
+    let dist_context = dist_context.update_children()?;
+
+    if dist_context.plan.children().is_empty() {
+        return Ok(Transformed::No(dist_context));
+    }
+
     let target_partitions = config.execution.target_partitions;
     // When `false`, round robin repartition will not be added to increase 
parallelism
     let enable_round_robin = config.optimizer.enable_round_robin_repartition;
@@ -1186,14 +1158,11 @@ fn ensure_distribution(
     let order_preserving_variants_desirable =
         is_unbounded || config.optimizer.prefer_existing_sort;
 
-    if dist_context.plan.children().is_empty() {
-        return Ok(Transformed::No(dist_context));
-    }
-
     // Remove unnecessary repartition from the physical plan if any
     let DistributionContext {
         mut plan,
-        mut distribution_onwards,
+        distribution_connection,
+        children_nodes,
     } = remove_dist_changing_operators(dist_context)?;
 
     if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
@@ -1213,33 +1182,23 @@ fn ensure_distribution(
             plan = updated_window;
         }
     };
-    let n_children = plan.children().len();
+
     // This loop iterates over all the children to:
     // - Increase parallelism for every child if it is beneficial.
     // - Satisfy the distribution requirements of every child, if it is not
     //   already satisfied.
     // We store the updated children in `new_children`.
-    let new_children = izip!(
-        plan.children().into_iter(),
+    let children_nodes = izip!(
+        children_nodes.into_iter(),
         plan.required_input_distribution().iter(),
         plan.required_input_ordering().iter(),
-        distribution_onwards.iter_mut(),
         plan.benefits_from_input_partitioning(),
-        plan.maintains_input_order(),
-        0..n_children
+        plan.maintains_input_order()
     )
     .map(
-        |(
-            mut child,
-            requirement,
-            required_input_ordering,
-            dist_onward,
-            would_benefit,
-            maintains,
-            child_idx,
-        )| {
+        |(mut child, requirement, required_input_ordering, would_benefit, 
maintains)| {
             // Don't need to apply when the returned row count is not greater 
than 1:
-            let num_rows = child.statistics()?.num_rows;
+            let num_rows = child.plan.statistics()?.num_rows;
             let repartition_beneficial_stats = if 
num_rows.is_exact().unwrap_or(false) {
                 num_rows
                     .get_value()
@@ -1248,45 +1207,39 @@ fn ensure_distribution(
             } else {
                 true
             };
+
             if enable_round_robin
                 // Operator benefits from partitioning (e.g. filter):
                 && (would_benefit && repartition_beneficial_stats)
                 // Unless partitioning doesn't increase the partition count, 
it is not beneficial:
-                && child.output_partitioning().partition_count() < 
target_partitions
+                && child.plan.output_partitioning().partition_count() < 
target_partitions
             {
                 // When `repartition_file_scans` is set, attempt to increase
                 // parallelism at the source.
                 if repartition_file_scans {
                     if let Some(new_child) =
-                        child.repartitioned(target_partitions, config)?
+                        child.plan.repartitioned(target_partitions, config)?
                     {
-                        child = new_child;
+                        child.plan = new_child;
                     }
                 }
                 // Increase parallelism by adding round-robin repartitioning
                 // on top of the operator. Note that we only do this if the
                 // partition count is not already equal to the desired 
partition
                 // count.
-                child = add_roundrobin_on_top(
-                    child,
-                    target_partitions,
-                    dist_onward,
-                    child_idx,
-                )?;
+                child = add_roundrobin_on_top(child, target_partitions)?;
             }
 
             // Satisfy the distribution requirement if it is unmet.
             match requirement {
                 Distribution::SinglePartition => {
-                    child = add_spm_on_top(child, dist_onward, child_idx);
+                    child = add_spm_on_top(child);
                 }
                 Distribution::HashPartitioned(exprs) => {
                     child = add_hash_on_top(
                         child,
                         exprs.to_vec(),
                         target_partitions,
-                        dist_onward,
-                        child_idx,
                         repartition_beneficial_stats,
                     )?;
                 }
@@ -1299,31 +1252,38 @@ fn ensure_distribution(
                 // - Ordering requirement cannot be satisfied by preserving 
ordering through repartitions, or
                 // - using order preserving variant is not desirable.
                 let ordering_satisfied = child
+                    .plan
                     .equivalence_properties()
                     .ordering_satisfy_requirement(required_input_ordering);
-                if !ordering_satisfied || !order_preserving_variants_desirable 
{
-                    replace_order_preserving_variants(&mut child, 
dist_onward)?;
+                if (!ordering_satisfied || 
!order_preserving_variants_desirable)
+                    && child.distribution_connection
+                {
+                    child = replace_order_preserving_variants(child)?;
                     // If ordering requirements were satisfied before 
repartitioning,
                     // make sure ordering requirements are still satisfied 
after.
                     if ordering_satisfied {
                         // Make sure to satisfy ordering requirement:
-                        add_sort_above(&mut child, required_input_ordering, 
None);
+                        child = add_sort_preserving_partitions(
+                            child,
+                            required_input_ordering,
+                            None,
+                        );
                     }
                 }
                 // Stop tracking distribution changing operators
-                *dist_onward = None;
+                child.distribution_connection = false;
             } else {
                 // no ordering requirement
                 match requirement {
                     // Operator requires specific distribution.
                     Distribution::SinglePartition | 
Distribution::HashPartitioned(_) => {
                         // Since there is no ordering requirement, preserving 
ordering is pointless
-                        replace_order_preserving_variants(&mut child, 
dist_onward)?;
+                        child = replace_order_preserving_variants(child)?;
                     }
                     Distribution::UnspecifiedDistribution => {
                         // Since ordering is lost, trying to preserve ordering 
is pointless
-                        if !maintains {
-                            replace_order_preserving_variants(&mut child, 
dist_onward)?;
+                        if !maintains || 
plan.as_any().is::<OutputRequirementExec>() {
+                            child = replace_order_preserving_variants(child)?;
                         }
                     }
                 }
@@ -1334,7 +1294,9 @@ fn ensure_distribution(
     .collect::<Result<Vec<_>>>()?;
 
     let new_distribution_context = DistributionContext {
-        plan: if plan.as_any().is::<UnionExec>() && 
can_interleave(&new_children) {
+        plan: if plan.as_any().is::<UnionExec>()
+            && can_interleave(children_nodes.iter().map(|c| c.plan.clone()))
+        {
             // Add a special case for [`UnionExec`] since we want to "bubble 
up"
             // hash-partitioned data. So instead of
             //
@@ -1358,120 +1320,91 @@ fn ensure_distribution(
             //     - Agg:
             //         Repartition (hash):
             //           Data
-            Arc::new(InterleaveExec::try_new(new_children)?)
+            Arc::new(InterleaveExec::try_new(
+                children_nodes.iter().map(|c| c.plan.clone()).collect(),
+            )?)
         } else {
-            plan.with_new_children(new_children)?
+            plan.with_new_children(
+                children_nodes.iter().map(|c| c.plan.clone()).collect(),
+            )?
         },
-        distribution_onwards,
+        distribution_connection,
+        children_nodes,
     };
+
     Ok(Transformed::Yes(new_distribution_context))
 }
 
-/// A struct to keep track of distribution changing executors
+/// A struct to keep track of distribution changing operators
 /// (`RepartitionExec`, `SortPreservingMergeExec`, `CoalescePartitionsExec`),
 /// and their associated parents inside `plan`. Using this information,
 /// we can optimize distribution of the plan if/when necessary.
 #[derive(Debug, Clone)]
 struct DistributionContext {
     plan: Arc<dyn ExecutionPlan>,
-    /// Keep track of associations for each child of the plan. If `None`,
-    /// there is no distribution changing operator in its descendants.
-    distribution_onwards: Vec<Option<ExecTree>>,
+    /// Indicates whether this plan is connected to a distribution-changing
+    /// operator.
+    distribution_connection: bool,
+    children_nodes: Vec<Self>,
 }
 
 impl DistributionContext {
-    /// Creates an empty context.
+    /// Creates a tree according to the plan with empty states.
     fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
-        let length = plan.children().len();
-        DistributionContext {
+        let children = plan.children();
+        Self {
             plan,
-            distribution_onwards: vec![None; length],
+            distribution_connection: false,
+            children_nodes: children.into_iter().map(Self::new).collect(),
         }
     }
 
-    /// Constructs a new context from children contexts.
-    fn new_from_children_nodes(
-        children_nodes: Vec<DistributionContext>,
-        parent_plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Self> {
-        let children_plans = children_nodes
-            .iter()
-            .map(|item| item.plan.clone())
-            .collect();
-        let distribution_onwards = children_nodes
-            .into_iter()
-            .enumerate()
-            .map(|(idx, context)| {
-                let DistributionContext {
-                    plan,
-                    // The `distribution_onwards` tree keeps track of operators
-                    // that change distribution, or preserves the existing
-                    // distribution (starting from an operator that change 
distribution).
-                    distribution_onwards,
-                } = context;
-                if plan.children().is_empty() {
-                    // Plan has no children, there is nothing to propagate.
-                    None
-                } else if distribution_onwards[0].is_none() {
-                    if let Some(repartition) =
-                        plan.as_any().downcast_ref::<RepartitionExec>()
-                    {
-                        match repartition.partitioning() {
-                            Partitioning::RoundRobinBatch(_)
-                            | Partitioning::Hash(_, _) => {
-                                // Start tracking operators starting from this 
repartition (either roundrobin or hash):
-                                return Some(ExecTree::new(plan, idx, vec![]));
-                            }
-                            _ => {}
-                        }
-                    } else if plan.as_any().is::<SortPreservingMergeExec>()
-                        || plan.as_any().is::<CoalescePartitionsExec>()
-                    {
-                        // Start tracking operators starting from this sort 
preserving merge:
-                        return Some(ExecTree::new(plan, idx, vec![]));
-                    }
-                    None
-                } else {
-                    // Propagate children distribution tracking to the above
-                    let new_distribution_onwards = izip!(
-                        plan.required_input_distribution().iter(),
-                        distribution_onwards.into_iter()
-                    )
-                    .flat_map(|(required_dist, distribution_onwards)| {
-                        if let Some(distribution_onwards) = 
distribution_onwards {
-                            // Operator can safely propagate the distribution 
above.
-                            // This is similar to maintaining order in the 
EnforceSorting rule.
-                            if let Distribution::UnspecifiedDistribution = 
required_dist {
-                                return Some(distribution_onwards);
-                            }
-                        }
-                        None
-                    })
-                    .collect::<Vec<_>>();
-                    // Either:
-                    // - None of the children has a connection to an operator 
that modifies distribution, or
-                    // - The current operator requires distribution at its 
input so doesn't propagate it above.
-                    if new_distribution_onwards.is_empty() {
-                        None
-                    } else {
-                        Some(ExecTree::new(plan, idx, 
new_distribution_onwards))
-                    }
+    fn update_children(mut self) -> Result<Self> {
+        for child_context in self.children_nodes.iter_mut() {
+            child_context.distribution_connection = match 
child_context.plan.as_any() {
+                plan_any if plan_any.is::<RepartitionExec>() => matches!(
+                    plan_any
+                        .downcast_ref::<RepartitionExec>()
+                        .unwrap()
+                        .partitioning(),
+                    Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
+                ),
+                plan_any
+                    if plan_any.is::<SortPreservingMergeExec>()
+                        || plan_any.is::<CoalescePartitionsExec>() =>
+                {
+                    true
                 }
-            })
-            .collect();
-        Ok(DistributionContext {
-            plan: with_new_children_if_necessary(parent_plan, 
children_plans)?.into(),
-            distribution_onwards,
-        })
-    }
+                _ => {
+                    child_context.plan.children().is_empty()
+                        || 
child_context.children_nodes[0].distribution_connection
+                        || child_context
+                            .plan
+                            .required_input_distribution()
+                            .iter()
+                            .zip(child_context.children_nodes.iter())
+                            .any(|(required_dist, child_context)| {
+                                child_context.distribution_connection
+                                    && matches!(
+                                        required_dist,
+                                        Distribution::UnspecifiedDistribution
+                                    )
+                            })
+                }
+            };
+        }
 
-    /// Computes distribution tracking contexts for every child of the plan.
-    fn children(&self) -> Vec<DistributionContext> {
-        self.plan
-            .children()
-            .into_iter()
-            .map(DistributionContext::new)
-            .collect()
+        let children_plans = self
+            .children_nodes
+            .iter()
+            .map(|context| context.plan.clone())
+            .collect::<Vec<_>>();
+
+        Ok(Self {
+            plan: with_new_children_if_necessary(self.plan, 
children_plans)?.into(),
+            distribution_connection: false,
+            children_nodes: self.children_nodes,
+        })
     }
 }
 
@@ -1480,8 +1413,8 @@ impl TreeNode for DistributionContext {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        for child in self.children() {
-            match op(&child)? {
+        for child in &self.children_nodes {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -1490,20 +1423,23 @@ impl TreeNode for DistributionContext {
         Ok(VisitRecursion::Continue)
     }
 
-    fn map_children<F>(self, transform: F) -> Result<Self>
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        let children = self.children();
-        if children.is_empty() {
-            Ok(self)
-        } else {
-            let children_nodes = children
+        if !self.children_nodes.is_empty() {
+            self.children_nodes = self
+                .children_nodes
                 .into_iter()
                 .map(transform)
-                .collect::<Result<Vec<_>>>()?;
-            DistributionContext::new_from_children_nodes(children_nodes, 
self.plan)
+                .collect::<Result<_>>()?;
+            self.plan = with_new_children_if_necessary(
+                self.plan,
+                self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+            )?
+            .into();
         }
+        Ok(self)
     }
 }
 
@@ -1512,11 +1448,11 @@ impl fmt::Display for DistributionContext {
     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         let plan_string = get_plan_string(&self.plan);
         write!(f, "plan: {:?}", plan_string)?;
-        for (idx, child) in self.distribution_onwards.iter().enumerate() {
-            if let Some(child) = child {
-                write!(f, "idx:{:?}, exec_tree:{}", idx, child)?;
-            }
-        }
+        write!(
+            f,
+            "distribution_connection:{}",
+            self.distribution_connection,
+        )?;
         write!(f, "")
     }
 }
@@ -1532,37 +1468,18 @@ struct PlanWithKeyRequirements {
     plan: Arc<dyn ExecutionPlan>,
     /// Parent required key ordering
     required_key_ordering: Vec<Arc<dyn PhysicalExpr>>,
-    /// The request key ordering to children
-    request_key_ordering: Vec<Option<Vec<Arc<dyn PhysicalExpr>>>>,
+    children: Vec<Self>,
 }
 
 impl PlanWithKeyRequirements {
     fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
-        let children_len = plan.children().len();
-        PlanWithKeyRequirements {
+        let children = plan.children();
+        Self {
             plan,
             required_key_ordering: vec![],
-            request_key_ordering: vec![None; children_len],
+            children: children.into_iter().map(Self::new).collect(),
         }
     }
-
-    fn children(&self) -> Vec<PlanWithKeyRequirements> {
-        let plan_children = self.plan.children();
-        assert_eq!(plan_children.len(), self.request_key_ordering.len());
-        plan_children
-            .into_iter()
-            .zip(self.request_key_ordering.clone())
-            .map(|(child, required)| {
-                let from_parent = required.unwrap_or_default();
-                let length = child.children().len();
-                PlanWithKeyRequirements {
-                    plan: child,
-                    required_key_ordering: from_parent,
-                    request_key_ordering: vec![None; length],
-                }
-            })
-            .collect()
-    }
 }
 
 impl TreeNode for PlanWithKeyRequirements {
@@ -1570,9 +1487,8 @@ impl TreeNode for PlanWithKeyRequirements {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        let children = self.children();
-        for child in children {
-            match op(&child)? {
+        for child in &self.children {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -1582,28 +1498,23 @@ impl TreeNode for PlanWithKeyRequirements {
         Ok(VisitRecursion::Continue)
     }
 
-    fn map_children<F>(self, transform: F) -> Result<Self>
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        let children = self.children();
-        if !children.is_empty() {
-            let new_children: Result<Vec<_>> =
-                children.into_iter().map(transform).collect();
-
-            let children_plans = new_children?
+        if !self.children.is_empty() {
+            self.children = self
+                .children
                 .into_iter()
-                .map(|child| child.plan)
-                .collect::<Vec<_>>();
-            let new_plan = with_new_children_if_necessary(self.plan, 
children_plans)?;
-            Ok(PlanWithKeyRequirements {
-                plan: new_plan.into(),
-                required_key_ordering: self.required_key_ordering,
-                request_key_ordering: self.request_key_ordering,
-            })
-        } else {
-            Ok(self)
+                .map(transform)
+                .collect::<Result<_>>()?;
+            self.plan = with_new_children_if_necessary(
+                self.plan,
+                self.children.iter().map(|c| c.plan.clone()).collect(),
+            )?
+            .into();
         }
+        Ok(self)
     }
 }
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 2ecc1e11b9..77d04a61c5 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -44,7 +44,7 @@ use 
crate::physical_optimizer::replace_with_order_preserving_variants::{
 use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
 use crate::physical_optimizer::utils::{
     add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort,
-    is_sort_preserving_merge, is_union, is_window, ExecTree,
+    is_sort_preserving_merge, is_union, is_window,
 };
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -81,78 +81,66 @@ impl EnforceSorting {
 #[derive(Debug, Clone)]
 struct PlanWithCorrespondingSort {
     plan: Arc<dyn ExecutionPlan>,
-    // For every child, keep a subtree of `ExecutionPlan`s starting from the
-    // child until the `SortExec`(s) -- could be multiple for n-ary plans like
-    // Union -- that determine the output ordering of the child. If the child
-    // has no connection to any sort, simply store None (and not a subtree).
-    sort_onwards: Vec<Option<ExecTree>>,
+    // For every child, track `ExecutionPlan`s starting from the child until
+    // the `SortExec`(s). If the child has no connection to any sort, it simply
+    // stores false.
+    sort_connection: bool,
+    children_nodes: Vec<Self>,
 }
 
 impl PlanWithCorrespondingSort {
     fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
-        let length = plan.children().len();
-        PlanWithCorrespondingSort {
+        let children = plan.children();
+        Self {
             plan,
-            sort_onwards: vec![None; length],
+            sort_connection: false,
+            children_nodes: children.into_iter().map(Self::new).collect(),
         }
     }
 
-    fn new_from_children_nodes(
-        children_nodes: Vec<PlanWithCorrespondingSort>,
+    fn update_children(
         parent_plan: Arc<dyn ExecutionPlan>,
+        mut children_nodes: Vec<Self>,
     ) -> Result<Self> {
-        let children_plans = children_nodes
-            .iter()
-            .map(|item| item.plan.clone())
-            .collect::<Vec<_>>();
-        let sort_onwards = children_nodes
-            .into_iter()
-            .enumerate()
-            .map(|(idx, item)| {
-                let plan = &item.plan;
-                // Leaves of `sort_onwards` are `SortExec` operators, which 
impose
-                // an ordering. This tree collects all the intermediate 
executors
-                // that maintain this ordering. If we just saw a order imposing
-                // operator, we reset the tree and start accumulating.
-                if is_sort(plan) {
-                    return Some(ExecTree::new(item.plan, idx, vec![]));
-                } else if is_limit(plan) {
-                    // There is no sort linkage for this path, it starts at a 
limit.
-                    return None;
-                }
+        for node in children_nodes.iter_mut() {
+            let plan = &node.plan;
+            // Leaves of `sort_onwards` are `SortExec` operators, which impose
+            // an ordering. This tree collects all the intermediate executors
+            // that maintain this ordering. If we just saw a order imposing
+            // operator, we reset the tree and start accumulating.
+            node.sort_connection = if is_sort(plan) {
+                // Initiate connection
+                true
+            } else if is_limit(plan) {
+                // There is no sort linkage for this path, it starts at a 
limit.
+                false
+            } else {
                 let is_spm = is_sort_preserving_merge(plan);
                 let required_orderings = plan.required_input_ordering();
                 let flags = plan.maintains_input_order();
-                let children = izip!(flags, item.sort_onwards, 
required_orderings)
-                    .filter_map(|(maintains, element, required_ordering)| {
-                        if (required_ordering.is_none() && maintains) || 
is_spm {
-                            element
-                        } else {
-                            None
-                        }
-                    })
-                    .collect::<Vec<ExecTree>>();
-                if !children.is_empty() {
-                    // Add parent node to the tree if there is at least one
-                    // child with a subtree:
-                    Some(ExecTree::new(item.plan, idx, children))
-                } else {
-                    // There is no sort linkage for this child, do nothing.
-                    None
-                }
-            })
-            .collect();
+                // Add parent node to the tree if there is at least one
+                // child with a sort connection:
+                izip!(flags, required_orderings).any(|(maintains, 
required_ordering)| {
+                    let propagates_ordering =
+                        (maintains && required_ordering.is_none()) || is_spm;
+                    let connected_to_sort =
+                        node.children_nodes.iter().any(|item| 
item.sort_connection);
+                    propagates_ordering && connected_to_sort
+                })
+            }
+        }
 
+        let children_plans = children_nodes
+            .iter()
+            .map(|item| item.plan.clone())
+            .collect::<Vec<_>>();
         let plan = with_new_children_if_necessary(parent_plan, 
children_plans)?.into();
-        Ok(PlanWithCorrespondingSort { plan, sort_onwards })
-    }
 
-    fn children(&self) -> Vec<PlanWithCorrespondingSort> {
-        self.plan
-            .children()
-            .into_iter()
-            .map(PlanWithCorrespondingSort::new)
-            .collect()
+        Ok(Self {
+            plan,
+            sort_connection: false,
+            children_nodes,
+        })
     }
 }
 
@@ -161,9 +149,8 @@ impl TreeNode for PlanWithCorrespondingSort {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        let children = self.children();
-        for child in children {
-            match op(&child)? {
+        for child in &self.children_nodes {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -173,102 +160,79 @@ impl TreeNode for PlanWithCorrespondingSort {
         Ok(VisitRecursion::Continue)
     }
 
-    fn map_children<F>(self, transform: F) -> Result<Self>
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        let children = self.children();
-        if children.is_empty() {
-            Ok(self)
-        } else {
-            let children_nodes = children
+        if !self.children_nodes.is_empty() {
+            self.children_nodes = self
+                .children_nodes
                 .into_iter()
                 .map(transform)
-                .collect::<Result<Vec<_>>>()?;
-            PlanWithCorrespondingSort::new_from_children_nodes(children_nodes, 
self.plan)
+                .collect::<Result<_>>()?;
+            self.plan = with_new_children_if_necessary(
+                self.plan,
+                self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+            )?
+            .into();
         }
+        Ok(self)
     }
 }
 
-/// This object is used within the [EnforceSorting] rule to track the closest
+/// This object is used within the [`EnforceSorting`] rule to track the closest
 /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan.
 #[derive(Debug, Clone)]
 struct PlanWithCorrespondingCoalescePartitions {
     plan: Arc<dyn ExecutionPlan>,
-    // For every child, keep a subtree of `ExecutionPlan`s starting from the
-    // child until the `CoalescePartitionsExec`(s) -- could be multiple for
-    // n-ary plans like Union -- that affect the output partitioning of the
-    // child. If the child has no connection to any `CoalescePartitionsExec`,
-    // simply store None (and not a subtree).
-    coalesce_onwards: Vec<Option<ExecTree>>,
+    // Stores whether the plan is a `CoalescePartitionsExec` or it is 
connected to
+    // a `CoalescePartitionsExec` via its children.
+    coalesce_connection: bool,
+    children_nodes: Vec<Self>,
 }
 
 impl PlanWithCorrespondingCoalescePartitions {
+    /// Creates an empty tree with empty connections.
     fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
-        let length = plan.children().len();
-        PlanWithCorrespondingCoalescePartitions {
+        let children = plan.children();
+        Self {
             plan,
-            coalesce_onwards: vec![None; length],
+            coalesce_connection: false,
+            children_nodes: children.into_iter().map(Self::new).collect(),
         }
     }
 
-    fn new_from_children_nodes(
-        children_nodes: Vec<PlanWithCorrespondingCoalescePartitions>,
-        parent_plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Self> {
-        let children_plans = children_nodes
+    fn update_children(mut self) -> Result<Self> {
+        self.coalesce_connection = if self.plan.children().is_empty() {
+            // Plan has no children, it cannot be a `CoalescePartitionsExec`.
+            false
+        } else if is_coalesce_partitions(&self.plan) {
+            // Initiate a connection
+            true
+        } else {
+            self.children_nodes
+                .iter()
+                .enumerate()
+                .map(|(idx, node)| {
+                    // Only consider operators that don't require a
+                    // single partition, and connected to any coalesce
+                    node.coalesce_connection
+                        && !matches!(
+                            self.plan.required_input_distribution()[idx],
+                            Distribution::SinglePartition
+                        )
+                    // If all children are None. There is nothing to track, 
set connection false.
+                })
+                .any(|c| c)
+        };
+
+        let children_plans = self
+            .children_nodes
             .iter()
             .map(|item| item.plan.clone())
             .collect();
-        let coalesce_onwards = children_nodes
-            .into_iter()
-            .enumerate()
-            .map(|(idx, item)| {
-                // Leaves of the `coalesce_onwards` tree are 
`CoalescePartitionsExec`
-                // operators. This tree collects all the intermediate 
executors that
-                // maintain a single partition. If we just saw a 
`CoalescePartitionsExec`
-                // operator, we reset the tree and start accumulating.
-                let plan = item.plan;
-                if plan.children().is_empty() {
-                    // Plan has no children, there is nothing to propagate.
-                    None
-                } else if is_coalesce_partitions(&plan) {
-                    Some(ExecTree::new(plan, idx, vec![]))
-                } else {
-                    let children = item
-                        .coalesce_onwards
-                        .into_iter()
-                        .flatten()
-                        .filter(|item| {
-                            // Only consider operators that don't require a
-                            // single partition.
-                            !matches!(
-                                plan.required_input_distribution()[item.idx],
-                                Distribution::SinglePartition
-                            )
-                        })
-                        .collect::<Vec<_>>();
-                    if children.is_empty() {
-                        None
-                    } else {
-                        Some(ExecTree::new(plan, idx, children))
-                    }
-                }
-            })
-            .collect();
-        let plan = with_new_children_if_necessary(parent_plan, 
children_plans)?.into();
-        Ok(PlanWithCorrespondingCoalescePartitions {
-            plan,
-            coalesce_onwards,
-        })
-    }
-
-    fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
-        self.plan
-            .children()
-            .into_iter()
-            .map(PlanWithCorrespondingCoalescePartitions::new)
-            .collect()
+        self.plan = with_new_children_if_necessary(self.plan, 
children_plans)?.into();
+        Ok(self)
     }
 }
 
@@ -277,9 +241,8 @@ impl TreeNode for PlanWithCorrespondingCoalescePartitions {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        let children = self.children();
-        for child in children {
-            match op(&child)? {
+        for child in &self.children_nodes {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -289,23 +252,23 @@ impl TreeNode for PlanWithCorrespondingCoalescePartitions 
{
         Ok(VisitRecursion::Continue)
     }
 
-    fn map_children<F>(self, transform: F) -> Result<Self>
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        let children = self.children();
-        if children.is_empty() {
-            Ok(self)
-        } else {
-            let children_nodes = children
+        if !self.children_nodes.is_empty() {
+            self.children_nodes = self
+                .children_nodes
                 .into_iter()
                 .map(transform)
-                .collect::<Result<Vec<_>>>()?;
-            PlanWithCorrespondingCoalescePartitions::new_from_children_nodes(
-                children_nodes,
+                .collect::<Result<_>>()?;
+            self.plan = with_new_children_if_necessary(
                 self.plan,
-            )
+                self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+            )?
+            .into();
         }
+        Ok(self)
     }
 }
 
@@ -332,6 +295,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
         } else {
             adjusted.plan
         };
+
         let plan_with_pipeline_fixer = OrderPreservationContext::new(new_plan);
         let updated_plan =
             plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| {
@@ -345,7 +309,8 @@ impl PhysicalOptimizerRule for EnforceSorting {
 
         // Execute a top-down traversal to exploit sort push-down opportunities
         // missed by the bottom-up traversal:
-        let sort_pushdown = SortPushDown::init(updated_plan.plan);
+        let mut sort_pushdown = SortPushDown::new(updated_plan.plan);
+        sort_pushdown.assign_initial_requirements();
         let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?;
         Ok(adjusted.plan)
     }
@@ -376,16 +341,21 @@ impl PhysicalOptimizerRule for EnforceSorting {
 fn parallelize_sorts(
     requirements: PlanWithCorrespondingCoalescePartitions,
 ) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
-    let plan = requirements.plan;
-    let mut coalesce_onwards = requirements.coalesce_onwards;
-    if plan.children().is_empty() || coalesce_onwards[0].is_none() {
+    let PlanWithCorrespondingCoalescePartitions {
+        mut plan,
+        coalesce_connection,
+        mut children_nodes,
+    } = requirements.update_children()?;
+
+    if plan.children().is_empty() || !children_nodes[0].coalesce_connection {
         // We only take an action when the plan is either a SortExec, a
         // SortPreservingMergeExec or a CoalescePartitionsExec, and they
         // all have a single child. Therefore, if the first child is `None`,
         // we can return immediately.
         return Ok(Transformed::No(PlanWithCorrespondingCoalescePartitions {
             plan,
-            coalesce_onwards,
+            coalesce_connection,
+            children_nodes,
         }));
     } else if (is_sort(&plan) || is_sort_preserving_merge(&plan))
         && plan.output_partitioning().partition_count() <= 1
@@ -395,34 +365,30 @@ fn parallelize_sorts(
         // executors don't require single partition), then we can replace
         // the CoalescePartitionsExec + Sort cascade with a SortExec +
         // SortPreservingMergeExec cascade to parallelize sorting.
-        let mut prev_layer = plan.clone();
-        update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
         let (sort_exprs, fetch) = get_sort_exprs(&plan)?;
-        add_sort_above(
-            &mut prev_layer,
-            &PhysicalSortRequirement::from_sort_exprs(sort_exprs),
-            fetch,
-        );
-        let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer)
-            .with_fetch(fetch);
-        return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
-            plan: Arc::new(spm),
-            coalesce_onwards: vec![None],
-        }));
+        let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs);
+        let sort_exprs = sort_exprs.to_vec();
+        update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?;
+        add_sort_above(&mut plan, &sort_reqs, fetch);
+        let spm = SortPreservingMergeExec::new(sort_exprs, 
plan).with_fetch(fetch);
+
+        return Ok(Transformed::Yes(
+            PlanWithCorrespondingCoalescePartitions::new(Arc::new(spm)),
+        ));
     } else if is_coalesce_partitions(&plan) {
         // There is an unnecessary `CoalescePartitionsExec` in the plan.
-        let mut prev_layer = plan.clone();
-        update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
-        let new_plan = plan.with_new_children(vec![prev_layer])?;
-        return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
-            plan: new_plan,
-            coalesce_onwards: vec![None],
-        }));
+        update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?;
+
+        let new_plan = Arc::new(CoalescePartitionsExec::new(plan)) as _;
+        return Ok(Transformed::Yes(
+            PlanWithCorrespondingCoalescePartitions::new(new_plan),
+        ));
     }
 
     Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
         plan,
-        coalesce_onwards,
+        coalesce_connection,
+        children_nodes,
     }))
 }
 
@@ -431,91 +397,102 @@ fn parallelize_sorts(
 fn ensure_sorting(
     requirements: PlanWithCorrespondingSort,
 ) -> Result<Transformed<PlanWithCorrespondingSort>> {
+    let requirements = PlanWithCorrespondingSort::update_children(
+        requirements.plan,
+        requirements.children_nodes,
+    )?;
+
     // Perform naive analysis at the beginning -- remove already-satisfied 
sorts:
     if requirements.plan.children().is_empty() {
         return Ok(Transformed::No(requirements));
     }
-    let plan = requirements.plan;
-    let mut children = plan.children();
-    let mut sort_onwards = requirements.sort_onwards;
-    if let Some(result) = analyze_immediate_sort_removal(&plan, &sort_onwards) 
{
+    if let Some(result) = analyze_immediate_sort_removal(&requirements) {
         return Ok(Transformed::Yes(result));
     }
-    for (idx, (child, sort_onwards, required_ordering)) in izip!(
-        children.iter_mut(),
-        sort_onwards.iter_mut(),
-        plan.required_input_ordering()
-    )
-    .enumerate()
+
+    let plan = requirements.plan;
+    let mut children_nodes = requirements.children_nodes;
+
+    for (idx, (child_node, required_ordering)) in
+        izip!(children_nodes.iter_mut(), 
plan.required_input_ordering()).enumerate()
     {
-        let physical_ordering = child.output_ordering();
+        let mut child_plan = child_node.plan.clone();
+        let physical_ordering = child_plan.output_ordering();
         match (required_ordering, physical_ordering) {
             (Some(required_ordering), Some(_)) => {
-                if !child
+                if !child_plan
                     .equivalence_properties()
                     .ordering_satisfy_requirement(&required_ordering)
                 {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
-                    add_sort_above(child, &required_ordering, None);
-                    if is_sort(child) {
-                        *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
-                    } else {
-                        *sort_onwards = None;
+                    update_child_to_remove_unnecessary_sort(idx, child_node, 
&plan)?;
+                    add_sort_above(&mut child_plan, &required_ordering, None);
+                    if is_sort(&child_plan) {
+                        *child_node = 
PlanWithCorrespondingSort::update_children(
+                            child_plan,
+                            vec![child_node.clone()],
+                        )?;
+                        child_node.sort_connection = true;
                     }
                 }
             }
             (Some(required), None) => {
                 // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
-                add_sort_above(child, &required, None);
-                *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
+                add_sort_above(&mut child_plan, &required, None);
+                *child_node = PlanWithCorrespondingSort::update_children(
+                    child_plan,
+                    vec![child_node.clone()],
+                )?;
+                child_node.sort_connection = true;
             }
             (None, Some(_)) => {
                 // We have a `SortExec` whose effect may be neutralized by
                 // another order-imposing operator. Remove this sort.
                 if !plan.maintains_input_order()[idx] || is_union(&plan) {
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
+                    update_child_to_remove_unnecessary_sort(idx, child_node, 
&plan)?;
                 }
             }
             (None, None) => {
-                update_child_to_remove_unnecessary_sort(child, sort_onwards, 
&plan)?;
+                update_child_to_remove_unnecessary_sort(idx, child_node, 
&plan)?;
             }
         }
     }
     // For window expressions, we can remove some sorts when we can
     // calculate the result in reverse:
-    if is_window(&plan) {
-        if let Some(tree) = &mut sort_onwards[0] {
-            if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
-                return Ok(Transformed::Yes(result));
-            }
+    if is_window(&plan) && children_nodes[0].sort_connection {
+        if let Some(result) = analyze_window_sort_removal(&mut 
children_nodes[0], &plan)?
+        {
+            return Ok(Transformed::Yes(result));
         }
     } else if is_sort_preserving_merge(&plan)
-        && children[0].output_partitioning().partition_count() <= 1
+        && children_nodes[0]
+            .plan
+            .output_partitioning()
+            .partition_count()
+            <= 1
     {
         // This SortPreservingMergeExec is unnecessary, input already has a
         // single partition.
-        sort_onwards.truncate(1);
-        return Ok(Transformed::Yes(PlanWithCorrespondingSort {
-            plan: children.swap_remove(0),
-            sort_onwards,
-        }));
+        let child_node = children_nodes.swap_remove(0);
+        return Ok(Transformed::Yes(child_node));
     }
-    Ok(Transformed::Yes(PlanWithCorrespondingSort {
-        plan: plan.with_new_children(children)?,
-        sort_onwards,
-    }))
+    Ok(Transformed::Yes(
+        PlanWithCorrespondingSort::update_children(plan, children_nodes)?,
+    ))
 }
 
 /// Analyzes a given [`SortExec`] (`plan`) to determine whether its input
 /// already has a finer ordering than it enforces.
 fn analyze_immediate_sort_removal(
-    plan: &Arc<dyn ExecutionPlan>,
-    sort_onwards: &[Option<ExecTree>],
+    node: &PlanWithCorrespondingSort,
 ) -> Option<PlanWithCorrespondingSort> {
+    let PlanWithCorrespondingSort {
+        plan,
+        children_nodes,
+        ..
+    } = node;
     if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
         let sort_input = sort_exec.input().clone();
-
         // If this sort is unnecessary, we should remove it:
         if sort_input
             .equivalence_properties()
@@ -533,20 +510,33 @@ fn analyze_immediate_sort_removal(
                             sort_exec.expr().to_vec(),
                             sort_input,
                         ));
-                    let new_tree = ExecTree::new(
-                        new_plan.clone(),
-                        0,
-                        sort_onwards.iter().flat_map(|e| e.clone()).collect(),
-                    );
                     PlanWithCorrespondingSort {
                         plan: new_plan,
-                        sort_onwards: vec![Some(new_tree)],
+                        // SortPreservingMergeExec has single child.
+                        sort_connection: false,
+                        children_nodes: children_nodes
+                            .iter()
+                            .cloned()
+                            .map(|mut node| {
+                                node.sort_connection = false;
+                                node
+                            })
+                            .collect(),
                     }
                 } else {
                     // Remove the sort:
                     PlanWithCorrespondingSort {
                         plan: sort_input,
-                        sort_onwards: sort_onwards.to_vec(),
+                        sort_connection: false,
+                        children_nodes: children_nodes[0]
+                            .children_nodes
+                            .iter()
+                            .cloned()
+                            .map(|mut node| {
+                                node.sort_connection = false;
+                                node
+                            })
+                            .collect(),
                     }
                 },
             );
@@ -558,15 +548,15 @@ fn analyze_immediate_sort_removal(
 /// Analyzes a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine
 /// whether it may allow removing a sort.
 fn analyze_window_sort_removal(
-    sort_tree: &mut ExecTree,
+    sort_tree: &mut PlanWithCorrespondingSort,
     window_exec: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
     let requires_single_partition = matches!(
-        window_exec.required_input_distribution()[sort_tree.idx],
+        window_exec.required_input_distribution()[0],
         Distribution::SinglePartition
     );
-    let mut window_child =
-        remove_corresponding_sort_from_sub_plan(sort_tree, 
requires_single_partition)?;
+    remove_corresponding_sort_from_sub_plan(sort_tree, 
requires_single_partition)?;
+    let mut window_child = sort_tree.plan.clone();
     let (window_expr, new_window) =
         if let Some(exec) = 
window_exec.as_any().downcast_ref::<BoundedWindowAggExec>() {
             (
@@ -628,9 +618,9 @@ fn analyze_window_sort_removal(
 /// Updates child to remove the unnecessary [`CoalescePartitionsExec`] below 
it.
 fn update_child_to_remove_coalesce(
     child: &mut Arc<dyn ExecutionPlan>,
-    coalesce_onwards: &mut Option<ExecTree>,
+    coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions,
 ) -> Result<()> {
-    if let Some(coalesce_onwards) = coalesce_onwards {
+    if coalesce_onwards.coalesce_connection {
         *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards, 
child)?;
     }
     Ok(())
@@ -638,10 +628,10 @@ fn update_child_to_remove_coalesce(
 
 /// Removes the [`CoalescePartitionsExec`] from the plan in `coalesce_onwards`.
 fn remove_corresponding_coalesce_in_sub_plan(
-    coalesce_onwards: &mut ExecTree,
+    coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions,
     parent: &Arc<dyn ExecutionPlan>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
-    Ok(if is_coalesce_partitions(&coalesce_onwards.plan) {
+    if is_coalesce_partitions(&coalesce_onwards.plan) {
         // We can safely use the 0th index since we have a 
`CoalescePartitionsExec`.
         let mut new_plan = coalesce_onwards.plan.children()[0].clone();
         while new_plan.output_partitioning() == parent.output_partitioning()
@@ -650,89 +640,113 @@ fn remove_corresponding_coalesce_in_sub_plan(
         {
             new_plan = new_plan.children().swap_remove(0)
         }
-        new_plan
+        Ok(new_plan)
     } else {
         let plan = coalesce_onwards.plan.clone();
         let mut children = plan.children();
-        for item in &mut coalesce_onwards.children {
-            children[item.idx] = 
remove_corresponding_coalesce_in_sub_plan(item, &plan)?;
+        for (idx, node) in 
coalesce_onwards.children_nodes.iter_mut().enumerate() {
+            if node.coalesce_connection {
+                children[idx] = 
remove_corresponding_coalesce_in_sub_plan(node, &plan)?;
+            }
         }
-        plan.with_new_children(children)?
-    })
+        plan.with_new_children(children)
+    }
 }
 
 /// Updates child to remove the unnecessary sort below it.
 fn update_child_to_remove_unnecessary_sort(
-    child: &mut Arc<dyn ExecutionPlan>,
-    sort_onwards: &mut Option<ExecTree>,
+    child_idx: usize,
+    sort_onwards: &mut PlanWithCorrespondingSort,
     parent: &Arc<dyn ExecutionPlan>,
 ) -> Result<()> {
-    if let Some(sort_onwards) = sort_onwards {
+    if sort_onwards.sort_connection {
         let requires_single_partition = matches!(
-            parent.required_input_distribution()[sort_onwards.idx],
+            parent.required_input_distribution()[child_idx],
             Distribution::SinglePartition
         );
-        *child = remove_corresponding_sort_from_sub_plan(
-            sort_onwards,
-            requires_single_partition,
-        )?;
+        remove_corresponding_sort_from_sub_plan(sort_onwards, 
requires_single_partition)?;
     }
-    *sort_onwards = None;
+    sort_onwards.sort_connection = false;
     Ok(())
 }
 
 /// Removes the sort from the plan in `sort_onwards`.
 fn remove_corresponding_sort_from_sub_plan(
-    sort_onwards: &mut ExecTree,
+    sort_onwards: &mut PlanWithCorrespondingSort,
     requires_single_partition: bool,
-) -> Result<Arc<dyn ExecutionPlan>> {
+) -> Result<()> {
     // A `SortExec` is always at the bottom of the tree.
-    let mut updated_plan = if is_sort(&sort_onwards.plan) {
-        sort_onwards.plan.children().swap_remove(0)
+    if is_sort(&sort_onwards.plan) {
+        *sort_onwards = sort_onwards.children_nodes.swap_remove(0);
     } else {
-        let plan = &sort_onwards.plan;
-        let mut children = plan.children();
-        for item in &mut sort_onwards.children {
-            let requires_single_partition = matches!(
-                plan.required_input_distribution()[item.idx],
-                Distribution::SinglePartition
-            );
-            children[item.idx] =
-                remove_corresponding_sort_from_sub_plan(item, 
requires_single_partition)?;
+        let PlanWithCorrespondingSort {
+            plan,
+            sort_connection: _,
+            children_nodes,
+        } = sort_onwards;
+        let mut any_connection = false;
+        for (child_idx, child_node) in children_nodes.iter_mut().enumerate() {
+            if child_node.sort_connection {
+                any_connection = true;
+                let requires_single_partition = matches!(
+                    plan.required_input_distribution()[child_idx],
+                    Distribution::SinglePartition
+                );
+                remove_corresponding_sort_from_sub_plan(
+                    child_node,
+                    requires_single_partition,
+                )?;
+            }
         }
+        if any_connection || children_nodes.is_empty() {
+            *sort_onwards = PlanWithCorrespondingSort::update_children(
+                plan.clone(),
+                children_nodes.clone(),
+            )?;
+        }
+        let PlanWithCorrespondingSort {
+            plan,
+            children_nodes,
+            ..
+        } = sort_onwards;
         // Replace with variants that do not preserve order.
         if is_sort_preserving_merge(plan) {
-            children.swap_remove(0)
+            children_nodes.swap_remove(0);
+            *plan = plan.children().swap_remove(0);
         } else if let Some(repartition) = 
plan.as_any().downcast_ref::<RepartitionExec>()
         {
-            Arc::new(
-                // By default, RepartitionExec does not preserve order
-                RepartitionExec::try_new(
-                    children.swap_remove(0),
-                    repartition.partitioning().clone(),
-                )?,
-            )
-        } else {
-            plan.clone().with_new_children(children)?
+            *plan = Arc::new(RepartitionExec::try_new(
+                children_nodes[0].plan.clone(),
+                repartition.output_partitioning(),
+            )?) as _;
         }
     };
     // Deleting a merging sort may invalidate distribution requirements.
     // Ensure that we stay compliant with such requirements:
     if requires_single_partition
-        && updated_plan.output_partitioning().partition_count() > 1
+        && sort_onwards.plan.output_partitioning().partition_count() > 1
     {
         // If there is existing ordering, to preserve ordering use 
SortPreservingMergeExec
         // instead of CoalescePartitionsExec.
-        if let Some(ordering) = updated_plan.output_ordering() {
-            updated_plan = Arc::new(SortPreservingMergeExec::new(
+        if let Some(ordering) = sort_onwards.plan.output_ordering() {
+            let plan = Arc::new(SortPreservingMergeExec::new(
                 ordering.to_vec(),
-                updated_plan,
-            ));
+                sort_onwards.plan.clone(),
+            )) as _;
+            *sort_onwards = PlanWithCorrespondingSort::update_children(
+                plan,
+                vec![sort_onwards.clone()],
+            )?;
         } else {
-            updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan));
+            let plan =
+                
Arc::new(CoalescePartitionsExec::new(sort_onwards.plan.clone())) as _;
+            *sort_onwards = PlanWithCorrespondingSort::update_children(
+                plan,
+                vec![sort_onwards.clone()],
+            )?;
         }
     }
-    Ok(updated_plan)
+    Ok(())
 }
 
 /// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice 
when possible.
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs 
b/datafusion/core/src/physical_optimizer/output_requirements.rs
index f8bf3bb965..4d03840d3d 100644
--- a/datafusion/core/src/physical_optimizer/output_requirements.rs
+++ b/datafusion/core/src/physical_optimizer/output_requirements.rs
@@ -147,6 +147,10 @@ impl ExecutionPlan for OutputRequirementExec {
         self.input.output_ordering()
     }
 
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.input.clone()]
     }
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs 
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index d59248aadf..9e9f647d07 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -24,13 +24,13 @@ use std::sync::Arc;
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::joins::SymmetricHashJoinExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 
 use datafusion_common::config::OptimizerOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_common::{plan_err, DataFusionError};
 use datafusion_physical_expr::intervals::utils::{check_support, 
is_datatype_supported};
+use datafusion_physical_plan::joins::SymmetricHashJoinExec;
 
 /// The PipelineChecker rule rejects non-runnable query plans that use
 /// pipeline-breaking operators on infinite input(s).
@@ -70,14 +70,14 @@ impl PhysicalOptimizerRule for PipelineChecker {
 pub struct PipelineStatePropagator {
     pub(crate) plan: Arc<dyn ExecutionPlan>,
     pub(crate) unbounded: bool,
-    pub(crate) children: Vec<PipelineStatePropagator>,
+    pub(crate) children: Vec<Self>,
 }
 
 impl PipelineStatePropagator {
     /// Constructs a new, default pipelining state.
     pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
         let children = plan.children();
-        PipelineStatePropagator {
+        Self {
             plan,
             unbounded: false,
             children: children.into_iter().map(Self::new).collect(),
@@ -86,10 +86,7 @@ impl PipelineStatePropagator {
 
     /// Returns the children unboundedness information.
     pub fn children_unbounded(&self) -> Vec<bool> {
-        self.children
-            .iter()
-            .map(|c| c.unbounded)
-            .collect::<Vec<_>>()
+        self.children.iter().map(|c| c.unbounded).collect()
     }
 }
 
@@ -109,26 +106,23 @@ impl TreeNode for PipelineStatePropagator {
         Ok(VisitRecursion::Continue)
     }
 
-    fn map_children<F>(self, transform: F) -> Result<Self>
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
         if !self.children.is_empty() {
-            let new_children = self
+            self.children = self
                 .children
                 .into_iter()
                 .map(transform)
-                .collect::<Result<Vec<_>>>()?;
-            let children_plans = new_children.iter().map(|c| 
c.plan.clone()).collect();
-
-            Ok(PipelineStatePropagator {
-                plan: with_new_children_if_necessary(self.plan, 
children_plans)?.into(),
-                unbounded: self.unbounded,
-                children: new_children,
-            })
-        } else {
-            Ok(self)
+                .collect::<Result<_>>()?;
+            self.plan = with_new_children_if_necessary(
+                self.plan,
+                self.children.iter().map(|c| c.plan.clone()).collect(),
+            )?
+            .into();
         }
+        Ok(self)
     }
 }
 
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 0ff7e9f48e..91f3d2abc6 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -21,14 +21,13 @@
 
 use std::sync::Arc;
 
+use super::utils::is_repartition;
 use crate::error::Result;
-use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort, 
ExecTree};
+use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
 use crate::physical_plan::repartition::RepartitionExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 
-use super::utils::is_repartition;
-
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_physical_plan::unbounded_output;
@@ -40,80 +39,67 @@ use datafusion_physical_plan::unbounded_output;
 #[derive(Debug, Clone)]
 pub(crate) struct OrderPreservationContext {
     pub(crate) plan: Arc<dyn ExecutionPlan>,
-    ordering_onwards: Vec<Option<ExecTree>>,
+    ordering_connection: bool,
+    children_nodes: Vec<Self>,
 }
 
 impl OrderPreservationContext {
-    /// Creates a "default" order-preservation context.
+    /// Creates an empty context tree. Each node has `false` connections.
     pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
-        let length = plan.children().len();
-        OrderPreservationContext {
+        let children = plan.children();
+        Self {
             plan,
-            ordering_onwards: vec![None; length],
+            ordering_connection: false,
+            children_nodes: children.into_iter().map(Self::new).collect(),
         }
     }
 
     /// Creates a new order-preservation context from those of children nodes.
-    pub fn new_from_children_nodes(
-        children_nodes: Vec<OrderPreservationContext>,
-        parent_plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Self> {
-        let children_plans = children_nodes
-            .iter()
-            .map(|item| item.plan.clone())
-            .collect();
-        let ordering_onwards = children_nodes
-            .into_iter()
-            .enumerate()
-            .map(|(idx, item)| {
-                // `ordering_onwards` tree keeps track of executors that 
maintain
-                // ordering, (or that can maintain ordering with the 
replacement of
-                // its variant)
-                let plan = item.plan;
-                let children = plan.children();
-                let ordering_onwards = item.ordering_onwards;
-                if children.is_empty() {
-                    // Plan has no children, there is nothing to propagate.
-                    None
-                } else if ordering_onwards[0].is_none()
-                    && ((is_repartition(&plan) && 
!plan.maintains_input_order()[0])
-                        || (is_coalesce_partitions(&plan)
-                            && children[0].output_ordering().is_some()))
-                {
-                    Some(ExecTree::new(plan, idx, vec![]))
-                } else {
-                    let children = ordering_onwards
-                        .into_iter()
-                        .flatten()
-                        .filter(|item| {
-                            // Only consider operators that maintains ordering
-                            plan.maintains_input_order()[item.idx]
-                                || is_coalesce_partitions(&plan)
-                                || is_repartition(&plan)
-                        })
-                        .collect::<Vec<_>>();
-                    if children.is_empty() {
-                        None
-                    } else {
-                        Some(ExecTree::new(plan, idx, children))
-                    }
-                }
-            })
-            .collect();
-        let plan = with_new_children_if_necessary(parent_plan, 
children_plans)?.into();
-        Ok(OrderPreservationContext {
-            plan,
-            ordering_onwards,
-        })
-    }
+    pub fn update_children(mut self) -> Result<Self> {
+        for node in self.children_nodes.iter_mut() {
+            let plan = node.plan.clone();
+            let children = plan.children();
+            let maintains_input_order = plan.maintains_input_order();
+            let inspect_child = |idx| {
+                maintains_input_order[idx]
+                    || is_coalesce_partitions(&plan)
+                    || is_repartition(&plan)
+            };
+
+            // We cut the path towards nodes that do not maintain ordering.
+            for (idx, c) in node.children_nodes.iter_mut().enumerate() {
+                c.ordering_connection &= inspect_child(idx);
+            }
+
+            node.ordering_connection = if children.is_empty() {
+                false
+            } else if !node.children_nodes[0].ordering_connection
+                && ((is_repartition(&plan) && !maintains_input_order[0])
+                    || (is_coalesce_partitions(&plan)
+                        && children[0].output_ordering().is_some()))
+            {
+                // We either have a RepartitionExec or a CoalescePartitionsExec
+                // and they lose their input ordering, so initiate connection:
+                true
+            } else {
+                // Maintain connection if there is a child with a connection,
+                // and operator can possibly maintain that connection (either
+                // in its current form or when we replace it with the 
corresponding
+                // order preserving operator).
+                node.children_nodes
+                    .iter()
+                    .enumerate()
+                    .any(|(idx, c)| c.ordering_connection && 
inspect_child(idx))
+            }
+        }
 
-    /// Computes order-preservation contexts for every child of the plan.
-    pub fn children(&self) -> Vec<OrderPreservationContext> {
-        self.plan
-            .children()
-            .into_iter()
-            .map(OrderPreservationContext::new)
-            .collect()
+        self.plan = with_new_children_if_necessary(
+            self.plan,
+            self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+        )?
+        .into();
+        self.ordering_connection = false;
+        Ok(self)
     }
 }
 
@@ -122,8 +108,8 @@ impl TreeNode for OrderPreservationContext {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        for child in self.children() {
-            match op(&child)? {
+        for child in &self.children_nodes {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -132,68 +118,88 @@ impl TreeNode for OrderPreservationContext {
         Ok(VisitRecursion::Continue)
     }
 
-    fn map_children<F>(self, transform: F) -> Result<Self>
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        let children = self.children();
-        if children.is_empty() {
-            Ok(self)
-        } else {
-            let children_nodes = children
+        if !self.children_nodes.is_empty() {
+            self.children_nodes = self
+                .children_nodes
                 .into_iter()
                 .map(transform)
-                .collect::<Result<Vec<_>>>()?;
-            OrderPreservationContext::new_from_children_nodes(children_nodes, 
self.plan)
+                .collect::<Result<_>>()?;
+            self.plan = with_new_children_if_necessary(
+                self.plan,
+                self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+            )?
+            .into();
         }
+        Ok(self)
     }
 }
 
-/// Calculates the updated plan by replacing executors that lose ordering
-/// inside the `ExecTree` with their order-preserving variants. This will
+/// Calculates the updated plan by replacing operators that lose ordering
+/// inside `sort_input` with their order-preserving variants. This will
 /// generate an alternative plan, which will be accepted or rejected later on
 /// depending on whether it helps us remove a `SortExec`.
 fn get_updated_plan(
-    exec_tree: &ExecTree,
+    mut sort_input: OrderPreservationContext,
     // Flag indicating that it is desirable to replace `RepartitionExec`s with
     // `SortPreservingRepartitionExec`s:
     is_spr_better: bool,
     // Flag indicating that it is desirable to replace 
`CoalescePartitionsExec`s
     // with `SortPreservingMergeExec`s:
     is_spm_better: bool,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    let plan = exec_tree.plan.clone();
+) -> Result<OrderPreservationContext> {
+    let updated_children = sort_input
+        .children_nodes
+        .clone()
+        .into_iter()
+        .map(|item| {
+            // Update children and their descendants in the given tree if the 
connection is open:
+            if item.ordering_connection {
+                get_updated_plan(item, is_spr_better, is_spm_better)
+            } else {
+                Ok(item)
+            }
+        })
+        .collect::<Result<Vec<_>>>()?;
 
-    let mut children = plan.children();
-    // Update children and their descendants in the given tree:
-    for item in &exec_tree.children {
-        children[item.idx] = get_updated_plan(item, is_spr_better, 
is_spm_better)?;
-    }
-    // Construct the plan with updated children:
-    let mut plan = plan.with_new_children(children)?;
+    sort_input.plan = sort_input
+        .plan
+        .with_new_children(updated_children.iter().map(|c| 
c.plan.clone()).collect())?;
+    sort_input.ordering_connection = false;
+    sort_input.children_nodes = updated_children;
 
     // When a `RepartitionExec` doesn't preserve ordering, replace it with
-    // a `SortPreservingRepartitionExec` if appropriate:
-    if is_repartition(&plan) && !plan.maintains_input_order()[0] && 
is_spr_better {
-        let child = plan.children().swap_remove(0);
-        let repartition = RepartitionExec::try_new(child, 
plan.output_partitioning())?
-            .with_preserve_order();
-        plan = Arc::new(repartition) as _
-    }
-    // When the input of a `CoalescePartitionsExec` has an ordering, replace it
-    // with a `SortPreservingMergeExec` if appropriate:
-    let mut children = plan.children();
-    if is_coalesce_partitions(&plan)
-        && children[0].output_ordering().is_some()
-        && is_spm_better
+    // a sort-preserving variant if appropriate:
+    if is_repartition(&sort_input.plan)
+        && !sort_input.plan.maintains_input_order()[0]
+        && is_spr_better
     {
-        let child = children.swap_remove(0);
-        plan = Arc::new(SortPreservingMergeExec::new(
-            child.output_ordering().unwrap_or(&[]).to_vec(),
-            child,
-        )) as _
+        let child = sort_input.plan.children().swap_remove(0);
+        let repartition =
+            RepartitionExec::try_new(child, 
sort_input.plan.output_partitioning())?
+                .with_preserve_order();
+        sort_input.plan = Arc::new(repartition) as _;
+        sort_input.children_nodes[0].ordering_connection = true;
+    } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
+        // When the input of a `CoalescePartitionsExec` has an ordering, 
replace it
+        // with a `SortPreservingMergeExec` if appropriate:
+        if let Some(ordering) = sort_input.children_nodes[0]
+            .plan
+            .output_ordering()
+            .map(|o| o.to_vec())
+        {
+            // Now we can mutate `new_node.children_nodes` safely
+            let child = sort_input.children_nodes.clone().swap_remove(0);
+            sort_input.plan =
+                Arc::new(SortPreservingMergeExec::new(ordering, child.plan)) 
as _;
+            sort_input.children_nodes[0].ordering_connection = true;
+        }
     }
-    Ok(plan)
+
+    Ok(sort_input)
 }
 
 /// The `replace_with_order_preserving_variants` optimizer sub-rule tries to
@@ -211,11 +217,11 @@ fn get_updated_plan(
 ///
 /// The algorithm flow is simply like this:
 /// 1. Visit nodes of the physical plan bottom-up and look for `SortExec` 
nodes.
-/// 1_1. During the traversal, build an `ExecTree` to keep track of operators
-///      that maintain ordering (or can maintain ordering when replaced by an
-///      order-preserving variant) until a `SortExec` is found.
+/// 1_1. During the traversal, keep track of operators that maintain ordering
+///    (or can maintain ordering when replaced by an order-preserving variant) 
until
+///    a `SortExec` is found.
 /// 2. When a `SortExec` is found, update the child of the `SortExec` by 
replacing
-///    operators that do not preserve ordering in the `ExecTree` with their 
order
+///    operators that do not preserve ordering in the tree with their order
 ///    preserving variants.
 /// 3. Check if the `SortExec` is still necessary in the updated plan by 
comparing
 ///    its input ordering with the output ordering it imposes. We do this 
because
@@ -239,37 +245,41 @@ pub(crate) fn replace_with_order_preserving_variants(
     is_spm_better: bool,
     config: &ConfigOptions,
 ) -> Result<Transformed<OrderPreservationContext>> {
-    let plan = &requirements.plan;
-    let ordering_onwards = &requirements.ordering_onwards;
-    if is_sort(plan) {
-        let exec_tree = if let Some(exec_tree) = &ordering_onwards[0] {
-            exec_tree
-        } else {
-            return Ok(Transformed::No(requirements));
-        };
-        // For unbounded cases, replace with the order-preserving variant in
-        // any case, as doing so helps fix the pipeline.
-        // Also do the replacement if opted-in via config options.
-        let use_order_preserving_variant =
-            config.optimizer.prefer_existing_sort || unbounded_output(plan);
-        let updated_sort_input = get_updated_plan(
-            exec_tree,
-            is_spr_better || use_order_preserving_variant,
-            is_spm_better || use_order_preserving_variant,
-        )?;
-        // If this sort is unnecessary, we should remove it and update the 
plan:
-        if updated_sort_input
-            .equivalence_properties()
-            .ordering_satisfy(plan.output_ordering().unwrap_or(&[]))
-        {
-            return Ok(Transformed::Yes(OrderPreservationContext {
-                plan: updated_sort_input,
-                ordering_onwards: vec![None],
-            }));
-        }
+    let mut requirements = requirements.update_children()?;
+    if !(is_sort(&requirements.plan)
+        && requirements.children_nodes[0].ordering_connection)
+    {
+        return Ok(Transformed::No(requirements));
     }
 
-    Ok(Transformed::No(requirements))
+    // For unbounded cases, replace with the order-preserving variant in
+    // any case, as doing so helps fix the pipeline.
+    // Also do the replacement if opted-in via config options.
+    let use_order_preserving_variant =
+        config.optimizer.prefer_existing_sort || 
unbounded_output(&requirements.plan);
+
+    let mut updated_sort_input = get_updated_plan(
+        requirements.children_nodes.clone().swap_remove(0),
+        is_spr_better || use_order_preserving_variant,
+        is_spm_better || use_order_preserving_variant,
+    )?;
+
+    // If this sort is unnecessary, we should remove it and update the plan:
+    if updated_sort_input
+        .plan
+        .equivalence_properties()
+        .ordering_satisfy(requirements.plan.output_ordering().unwrap_or(&[]))
+    {
+        for child in updated_sort_input.children_nodes.iter_mut() {
+            child.ordering_connection = false;
+        }
+        Ok(Transformed::Yes(updated_sort_input))
+    } else {
+        for child in requirements.children_nodes.iter_mut() {
+            child.ordering_connection = false;
+        }
+        Ok(Transformed::Yes(requirements))
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs 
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index b9502d92ac..b001386301 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -36,8 +36,6 @@ use datafusion_physical_expr::{
     LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement,
 };
 
-use itertools::izip;
-
 /// This is a "data class" we use within the [`EnforceSorting`] rule to push
 /// down [`SortExec`] in the plan. In some cases, we can reduce the total
 /// computational cost by pushing down `SortExec`s through some executors.
@@ -49,35 +47,26 @@ pub(crate) struct SortPushDown {
     pub plan: Arc<dyn ExecutionPlan>,
     /// Parent required sort ordering
     required_ordering: Option<Vec<PhysicalSortRequirement>>,
-    /// The adjusted request sort ordering to children.
-    /// By default they are the same as the plan's required input ordering, 
but can be adjusted based on parent required sort ordering properties.
-    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirement>>>,
+    children_nodes: Vec<Self>,
 }
 
 impl SortPushDown {
-    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
-        let request_ordering = plan.required_input_ordering();
-        SortPushDown {
+    /// Creates an empty tree with empty `required_ordering`'s.
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let children = plan.children();
+        Self {
             plan,
             required_ordering: None,
-            adjusted_request_ordering: request_ordering,
+            children_nodes: children.into_iter().map(Self::new).collect(),
         }
     }
 
-    pub fn children(&self) -> Vec<SortPushDown> {
-        izip!(
-            self.plan.children().into_iter(),
-            self.adjusted_request_ordering.clone().into_iter(),
-        )
-        .map(|(child, from_parent)| {
-            let child_request_ordering = child.required_input_ordering();
-            SortPushDown {
-                plan: child,
-                required_ordering: from_parent,
-                adjusted_request_ordering: child_request_ordering,
-            }
-        })
-        .collect()
+    /// Assigns the ordering requirement of the root node to the its children.
+    pub fn assign_initial_requirements(&mut self) {
+        let reqs = self.plan.required_input_ordering();
+        for (child, requirement) in self.children_nodes.iter_mut().zip(reqs) {
+            child.required_ordering = requirement;
+        }
     }
 }
 
@@ -86,9 +75,8 @@ impl TreeNode for SortPushDown {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        let children = self.children();
-        for child in children {
-            match op(&child)? {
+        for child in &self.children_nodes {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -97,64 +85,64 @@ impl TreeNode for SortPushDown {
 
         Ok(VisitRecursion::Continue)
     }
-
     fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        let children = self.children();
-        if !children.is_empty() {
-            let children_plans = children
+        if !self.children_nodes.is_empty() {
+            self.children_nodes = self
+                .children_nodes
                 .into_iter()
                 .map(transform)
-                .map(|r| r.map(|s| s.plan))
-                .collect::<Result<Vec<_>>>()?;
-
-            match with_new_children_if_necessary(self.plan, children_plans)? {
-                Transformed::Yes(plan) | Transformed::No(plan) => {
-                    self.plan = plan;
-                }
-            }
-        };
+                .collect::<Result<_>>()?;
+            self.plan = with_new_children_if_necessary(
+                self.plan,
+                self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+            )?
+            .into();
+        }
         Ok(self)
     }
 }
 
 pub(crate) fn pushdown_sorts(
-    requirements: SortPushDown,
+    mut requirements: SortPushDown,
 ) -> Result<Transformed<SortPushDown>> {
     let plan = &requirements.plan;
     let parent_required = 
requirements.required_ordering.as_deref().unwrap_or(&[]);
+
     if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
-        let new_plan = if !plan
+        if !plan
             .equivalence_properties()
             .ordering_satisfy_requirement(parent_required)
         {
             // If the current plan is a SortExec, modify it to satisfy parent 
requirements:
             let mut new_plan = sort_exec.input().clone();
             add_sort_above(&mut new_plan, parent_required, sort_exec.fetch());
-            new_plan
-        } else {
-            requirements.plan
+            requirements.plan = new_plan;
         };
-        let required_ordering = new_plan
+
+        let required_ordering = requirements
+            .plan
             .output_ordering()
             .map(PhysicalSortRequirement::from_sort_exprs)
             .unwrap_or_default();
         // Since new_plan is a SortExec, we can safely get the 0th index.
-        let child = new_plan.children().swap_remove(0);
+        let mut child = requirements.children_nodes.swap_remove(0);
         if let Some(adjusted) =
-            pushdown_requirement_to_children(&child, &required_ordering)?
+            pushdown_requirement_to_children(&child.plan, &required_ordering)?
         {
+            for (c, o) in child.children_nodes.iter_mut().zip(adjusted) {
+                c.required_ordering = o;
+            }
             // Can push down requirements
-            Ok(Transformed::Yes(SortPushDown {
-                plan: child,
-                required_ordering: None,
-                adjusted_request_ordering: adjusted,
-            }))
+            child.required_ordering = None;
+            Ok(Transformed::Yes(child))
         } else {
             // Can not push down requirements
-            Ok(Transformed::Yes(SortPushDown::init(new_plan)))
+            let mut empty_node = SortPushDown::new(requirements.plan);
+            empty_node.assign_initial_requirements();
+            Ok(Transformed::Yes(empty_node))
         }
     } else {
         // Executors other than SortExec
@@ -163,23 +151,27 @@ pub(crate) fn pushdown_sorts(
             .ordering_satisfy_requirement(parent_required)
         {
             // Satisfies parent requirements, immediately return.
-            return Ok(Transformed::Yes(SortPushDown {
-                required_ordering: None,
-                ..requirements
-            }));
+            let reqs = requirements.plan.required_input_ordering();
+            for (child, order) in 
requirements.children_nodes.iter_mut().zip(reqs) {
+                child.required_ordering = order;
+            }
+            return Ok(Transformed::Yes(requirements));
         }
         // Can not satisfy the parent requirements, check whether the 
requirements can be pushed down:
         if let Some(adjusted) = pushdown_requirement_to_children(plan, 
parent_required)? {
-            Ok(Transformed::Yes(SortPushDown {
-                plan: requirements.plan,
-                required_ordering: None,
-                adjusted_request_ordering: adjusted,
-            }))
+            for (c, o) in requirements.children_nodes.iter_mut().zip(adjusted) 
{
+                c.required_ordering = o;
+            }
+            requirements.required_ordering = None;
+            Ok(Transformed::Yes(requirements))
         } else {
             // Can not push down requirements, add new SortExec:
             let mut new_plan = requirements.plan;
             add_sort_above(&mut new_plan, parent_required, None);
-            Ok(Transformed::Yes(SortPushDown::init(new_plan)))
+            let mut new_empty = SortPushDown::new(new_plan);
+            new_empty.assign_initial_requirements();
+            // Can not push down requirements
+            Ok(Transformed::Yes(new_empty))
         }
     }
 }
@@ -297,10 +289,11 @@ fn pushdown_requirement_to_children(
     // TODO: Add support for Projection push down
 }
 
-/// Determine the children requirements
-/// If the children requirements are more specific, do not push down the 
parent requirements
-/// If the the parent requirements are more specific, push down the parent 
requirements
-/// If they are not compatible, need to add Sort.
+/// Determine children requirements:
+/// - If children requirements are more specific, do not push down parent
+///   requirements.
+/// - If parent requirements are more specific, push down parent requirements.
+/// - If they are not compatible, need to add a sort.
 fn determine_children_requirement(
     parent_required: LexRequirementRef,
     request_child: LexRequirementRef,
@@ -310,18 +303,15 @@ fn determine_children_requirement(
         .equivalence_properties()
         .requirements_compatible(request_child, parent_required)
     {
-        // request child requirements are more specific, no need to push down 
the parent requirements
+        // Child requirements are more specific, no need to push down.
         RequirementsCompatibility::Satisfy
     } else if child_plan
         .equivalence_properties()
         .requirements_compatible(parent_required, request_child)
     {
-        // parent requirements are more specific, adjust the request child 
requirements and push down the new requirements
-        let adjusted = if parent_required.is_empty() {
-            None
-        } else {
-            Some(parent_required.to_vec())
-        };
+        // Parent requirements are more specific, adjust child's requirements
+        // and push down the new requirements:
+        let adjusted = (!parent_required.is_empty()).then(|| 
parent_required.to_vec());
         RequirementsCompatibility::Compatible(adjusted)
     } else {
         RequirementsCompatibility::NonCompatible
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index fccc1db0d3..f8063e9694 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -17,83 +17,18 @@
 
 //! Collection of utility functions that are leveraged by the query optimizer 
rules
 
-use std::fmt;
-use std::fmt::Formatter;
 use std::sync::Arc;
 
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort::SortExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{get_plan_string, ExecutionPlan};
+use crate::physical_plan::ExecutionPlan;
 
 use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
-
-/// This object implements a tree that we use while keeping track of paths
-/// leading to [`SortExec`]s.
-#[derive(Debug, Clone)]
-pub(crate) struct ExecTree {
-    /// The `ExecutionPlan` associated with this node
-    pub plan: Arc<dyn ExecutionPlan>,
-    /// Child index of the plan in its parent
-    pub idx: usize,
-    /// Children of the plan that would need updating if we remove leaf 
executors
-    pub children: Vec<ExecTree>,
-}
-
-impl fmt::Display for ExecTree {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        let plan_string = get_plan_string(&self.plan);
-        write!(f, "\nidx: {:?}", self.idx)?;
-        write!(f, "\nplan: {:?}", plan_string)?;
-        for child in self.children.iter() {
-            write!(f, "\nexec_tree:{}", child)?;
-        }
-        writeln!(f)
-    }
-}
-
-impl ExecTree {
-    /// Create new Exec tree
-    pub fn new(
-        plan: Arc<dyn ExecutionPlan>,
-        idx: usize,
-        children: Vec<ExecTree>,
-    ) -> Self {
-        ExecTree {
-            plan,
-            idx,
-            children,
-        }
-    }
-}
-
-/// Get `ExecTree` for each child of the plan if they are tracked.
-/// # Arguments
-///
-/// * `n_children` - Children count of the plan of interest
-/// * `onward` - Contains `Some(ExecTree)` of the plan tracked.
-///            - Contains `None` is plan is not tracked.
-///
-/// # Returns
-///
-/// A `Vec<Option<ExecTree>>` that contains tracking information of each child.
-/// If a child is `None`, it is not tracked. If `Some(ExecTree)` child is 
tracked also.
-pub(crate) fn get_children_exectrees(
-    n_children: usize,
-    onward: &Option<ExecTree>,
-) -> Vec<Option<ExecTree>> {
-    let mut children_onward = vec![None; n_children];
-    if let Some(exec_tree) = &onward {
-        for child in &exec_tree.children {
-            children_onward[child.idx] = Some(child.clone());
-        }
-    }
-    children_onward
-}
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 
 /// This utility function adds a `SortExec` above an operator according to the
 /// given ordering requirements while preserving the original partitioning.
diff --git a/datafusion/physical-expr/src/sort_properties.rs 
b/datafusion/physical-expr/src/sort_properties.rs
index f513744617..91238e5b04 100644
--- a/datafusion/physical-expr/src/sort_properties.rs
+++ b/datafusion/physical-expr/src/sort_properties.rs
@@ -151,7 +151,7 @@ impl Neg for SortProperties {
 pub struct ExprOrdering {
     pub expr: Arc<dyn PhysicalExpr>,
     pub state: SortProperties,
-    pub children: Vec<ExprOrdering>,
+    pub children: Vec<Self>,
 }
 
 impl ExprOrdering {
@@ -191,15 +191,13 @@ impl TreeNode for ExprOrdering {
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        if self.children.is_empty() {
-            Ok(self)
-        } else {
+        if !self.children.is_empty() {
             self.children = self
                 .children
                 .into_iter()
                 .map(transform)
-                .collect::<Result<Vec<_>>>()?;
-            Ok(self)
+                .collect::<Result<_>>()?;
         }
+        Ok(self)
     }
 }
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 14ef9c2ec2..d01ea55074 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -21,6 +21,7 @@
 
 //! The Union operator combines multiple inputs with the same schema
 
+use std::borrow::Borrow;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 use std::{any::Any, sync::Arc};
@@ -336,7 +337,7 @@ impl InterleaveExec {
     pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
         let schema = union_schema(&inputs);
 
-        if !can_interleave(&inputs) {
+        if !can_interleave(inputs.iter()) {
             return internal_err!(
                 "Not all InterleaveExec children have a consistent hash 
partitioning"
             );
@@ -474,17 +475,18 @@ impl ExecutionPlan for InterleaveExec {
 /// It might be too strict here in the case that the input partition specs are 
compatible but not exactly the same.
 /// For example one input partition has the partition spec Hash('a','b','c') 
and
 /// other has the partition spec Hash('a'), It is safe to derive the out 
partition with the spec Hash('a','b','c').
-pub fn can_interleave(inputs: &[Arc<dyn ExecutionPlan>]) -> bool {
-    if inputs.is_empty() {
+pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
+    mut inputs: impl Iterator<Item = T>,
+) -> bool {
+    let Some(first) = inputs.next() else {
         return false;
-    }
+    };
 
-    let first_input_partition = inputs[0].output_partitioning();
-    matches!(first_input_partition, Partitioning::Hash(_, _))
+    let reference = first.borrow().output_partitioning();
+    matches!(reference, Partitioning::Hash(_, _))
         && inputs
-            .iter()
-            .map(|plan| plan.output_partitioning())
-            .all(|partition| partition == first_input_partition)
+            .map(|plan| plan.borrow().output_partitioning())
+            .all(|partition| partition == reference)
 }
 
 fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {

Reply via email to