mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099957264


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces 
ordering). This tree collects
+                    // all the intermediate executors that maintain this 
ordering. If
+                    // we just saw a sort-introducing operator, we reset the 
tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts 
at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, 
required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains 
its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) 
|| is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, 
children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+
+/// This object is used within the [EnforceSorting] rule to track the closest
+/// `CoalescePartitionsExec` descendant(s) for every child of a plan.
+#[derive(Debug, Clone)]
+struct PlanWithCorrespondingCoalescePartitions {
+    plan: Arc<dyn ExecutionPlan>,
+    // For every child, keep a subtree of `ExecutionPlan`s starting from the
+    // child until the `CoalescePartitionsExec`(s) -- could be multiple for
+    // n-ary plans like Union -- that affect the output partitioning of the
+    // child. If the child has no connection to any `CoalescePartitionsExec`,
+    // simpliy store None (and not a subtree).
+    coalesce_onwards: Vec<Option<ExecTree>>,
+}
+
+impl PlanWithCorrespondingCoalescePartitions {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards: vec![None; length],
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| PlanWithCorrespondingCoalescePartitions::new(child))
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithCorrespondingCoalescePartitions {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_requirements = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            let children_plans = children_requirements
                 .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g 
`SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input 
ordering. If we are at
-                        // the beginning both `SortExec` and 
`SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in 
flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && 
!element.is_empty() {
-                                return element.clone();
-                            }
+                .map(|item| item.plan.clone())
+                .collect();
+            let coalesce_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    // Leaves of the `coalesce_onwards` tree are 
`CoalescePartitionsExec`
+                    // operators. This tree collects all the intermediate 
executors that
+                    // maintain a single partition. If we just saw a 
`CoalescePartitionsExec`
+                    // operator, we reset the tree and start accumulating.
+                    let plan = item.plan;
+                    if plan.as_any().is::<CoalescePartitionsExec>() {
+                        Some(ExecTree {
+                            idx,
+                            plan,
+                            children: vec![],
+                        })
+                    } else if plan.children().is_empty() {
+                        // Plan has no children, there is nothing to propagate.
+                        None
+                    } else {
+                        let children = item
+                            .coalesce_onwards
+                            .into_iter()
+                            .flatten()
+                            .filter(|item| {
+                                // Only consider operators that don't require a
+                                // single partition.
+                                !matches!(
+                                    
plan.required_input_distribution()[item.idx],
+                                    Distribution::SinglePartition
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        if children.is_empty() {
+                            None
+                        } else {
+                            Some(ExecTree {
+                                idx,
+                                plan,
+                                children,
+                            })
                         }
                     }
-                    vec![]
                 })
-                .collect::<Vec<_>>();
+                .collect();
             let plan = with_new_children_if_necessary(self.plan, 
children_plans)?;
-            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+            Ok(PlanWithCorrespondingCoalescePartitions {
+                plan,
+                coalesce_onwards,
+            })
         }
     }
 }
 
+/// The boolean flag `repartition_sorts` defined in the config indicates
+/// whether we elect to transform CoalescePartitionsExec + SortExec cascades
+/// into SortExec + SortPreservingMergeExec cascades, which enables us to
+/// perform sorting in parallel.
 impl PhysicalOptimizerRule for EnforceSorting {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Execute a post-order traversal to adjust input key ordering:
         let plan_requirements = PlanWithCorrespondingSort::new(plan);

Review Comment:
   Removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to