mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099988290


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -184,225 +431,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if 
there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = 
sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question 
already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, 
sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when 
we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = 
analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering 
information propagates with
-                    //       necessary lineage information, compare 
`sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) 
-> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec 
to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a 
order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, 
count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = 
requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| 
e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }
+                } else {
+                    // Remove the sort:
+                    PlanWithCorrespondingSort {
+                        plan: sort_input,
+                        sort_onwards: sort_onwards.to_vec(),
+                    }
+                },
+            );
         }
     }
-    Ok(None)
+    None
 }
 
 /// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
 /// it may allow removing a sort.
 fn analyze_window_sort_removal(
-    window_expr: &[Arc<dyn WindowExpr>],
-    partition_keys: &[Arc<dyn PhysicalExpr>],
-    sort_exec: &SortExec,
-    sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    sort_tree: &mut ExecTree,
+    window_exec: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
-    let required_ordering = sort_exec.output_ordering().ok_or_else(|| {
-        DataFusionError::Plan("A SortExec should have output 
ordering".to_string())
-    })?;
-    let physical_ordering = sort_exec.input().output_ordering();
-    let physical_ordering = if let Some(physical_ordering) = physical_ordering 
{
-        physical_ordering
+    let (window_expr, partition_keys) = if let Some(exec) =
+        window_exec.as_any().downcast_ref::<BoundedWindowAggExec>()
+    {
+        (exec.window_expr(), &exec.partition_keys)
+    } else if let Some(exec) = 
window_exec.as_any().downcast_ref::<WindowAggExec>() {
+        (exec.window_expr(), &exec.partition_keys)
     } else {
-        // If there is no physical ordering, there is no way to remove a sort 
-- immediately return:
-        return Ok(None);
+        return Err(DataFusionError::Plan(
+            "Expects to receive either WindowAggExec of 
BoundedWindowAggExec".to_string(),
+        ));
     };
-    let (can_skip_sorting, should_reverse) = can_skip_sort(
-        window_expr[0].partition_by(),
-        required_ordering,
-        &sort_exec.input().schema(),
-        physical_ordering,
-    )?;
-    if can_skip_sorting {
-        let new_window_expr = if should_reverse {
-            window_expr
-                .iter()
-                .map(|e| e.get_reverse_expr())
-                .collect::<Option<Vec<_>>>()
-        } else {
-            Some(window_expr.to_vec())
-        };
-        if let Some(window_expr) = new_window_expr {
-            let new_child = 
remove_corresponding_sort_from_sub_plan(sort_onward)?;
-            let new_schema = new_child.schema();
-
-            let uses_bounded_memory = window_expr.iter().all(|e| 
e.uses_bounded_memory());
-            // If all window exprs can run with bounded memory choose bounded 
window variant
-            let new_plan = if uses_bounded_memory {
-                Arc::new(BoundedWindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
+
+    let mut first_should_reverse = None;
+    let mut physical_ordering_common = vec![];
+    for sort_any in sort_tree.get_leaves() {
+        let sort_output_ordering = sort_any.output_ordering();
+        // Variable `sort_any` will either be a `SortExec` or a
+        // `SortPreservingMergeExec`, and both have a single child.
+        // Therefore, we can use the 0th index without loss of generality.
+        let sort_input = sort_any.children()[0].clone();
+        let physical_ordering = sort_input.output_ordering();
+        // TODO: Once we can ensure that required ordering information 
propagates with
+        //       the necessary lineage information, compare 
`physical_ordering` and the
+        //       ordering required by the window executor instead of 
`sort_output_ordering`.
+        //       This will enable us to handle cases such as (a,b) -> Sort -> 
(a,b,c) -> Required(a,b).
+        //       Currently, we can not remove such sorts.
+        let required_ordering = sort_output_ordering.ok_or_else(|| {
+            DataFusionError::Plan("A SortExec should have output 
ordering".to_string())
+        })?;
+        if let Some(physical_ordering) = physical_ordering {
+            if physical_ordering_common.is_empty()
+                || physical_ordering.len() < physical_ordering_common.len()
+            {
+                physical_ordering_common = physical_ordering.to_vec();
+            }
+            let (can_skip_sorting, should_reverse) = can_skip_sort(
+                window_expr[0].partition_by(),
+                required_ordering,
+                &sort_input.schema(),
+                physical_ordering,
+            )?;
+            if !can_skip_sorting {
+                return Ok(None);
+            }
+            if let Some(first_should_reverse) = first_should_reverse {
+                if first_should_reverse != should_reverse {
+                    return Ok(None);
+                }
             } else {
-                Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
-            };
-            return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+                first_should_reverse = Some(should_reverse);
+            }
+        } else {
+            // If there is no physical ordering, there is no way to remove a
+            // sort, so immediately return.
+            return Ok(None);
         }
     }
+    let new_window_expr = if first_should_reverse.unwrap() {
+        window_expr
+            .iter()
+            .map(|e| e.get_reverse_expr())
+            .collect::<Option<Vec<_>>>()
+    } else {
+        Some(window_expr.to_vec())
+    };
+    if let Some(window_expr) = new_window_expr {
+        let requires_single_partition = matches!(
+            window_exec.required_input_distribution()[sort_tree.idx],
+            Distribution::SinglePartition
+        );
+        let new_child = remove_corresponding_sort_from_sub_plan(
+            sort_tree,
+            requires_single_partition,
+        )?;
+        let new_schema = new_child.schema();
+
+        let uses_bounded_memory = window_expr.iter().all(|e| 
e.uses_bounded_memory());
+        // If all window expressions can run with bounded memory, choose the
+        // bounded window variant:
+        let new_plan = if uses_bounded_memory {
+            Arc::new(BoundedWindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        } else {
+            Arc::new(WindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        };
+        return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+    }
     Ok(None)
 }
 
-/// Updates child to remove the unnecessary sorting below it.
-fn update_child_to_remove_unnecessary_sort(
+/// Updates child to remove the unnecessary `CoalescePartitions` below it.
+fn update_child_to_change_coalesce(
     child: &mut Arc<dyn ExecutionPlan>,
-    sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    coalesce_onwards: &mut Option<ExecTree>,
+    sort_exec: Option<&SortExec>,
 ) -> Result<()> {
-    if !sort_onwards.is_empty() {
-        *child = remove_corresponding_sort_from_sub_plan(sort_onwards)?;
+    if let Some(coalesce_onwards) = coalesce_onwards {
+        *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, 
sort_exec)?;
     }
     Ok(())
 }
 
-/// Converts an [ExecutionPlan] trait object to a [SortExec] when possible.
-fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> 
Result<&SortExec> {
-    sort_any.as_any().downcast_ref::<SortExec>().ok_or_else(|| {
-        DataFusionError::Plan("Given ExecutionPlan is not a 
SortExec".to_string())
-    })
+/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
+fn change_corresponding_coalesce_in_sub_plan(
+    coalesce_onwards: &mut ExecTree,
+    sort_exec: Option<&SortExec>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    Ok(
+        if coalesce_onwards
+            .plan
+            .as_any()
+            .is::<CoalescePartitionsExec>()
+        {
+            // We can safely use the 0th index since we have a 
`CoalescePartitionsExec`.
+            let coalesce_input = coalesce_onwards.plan.children()[0].clone();
+            if let Some(sort_exec) = sort_exec {
+                let sort_expr = sort_exec.expr();
+                if !ordering_satisfy(
+                    coalesce_input.output_ordering(),
+                    Some(sort_expr),
+                    || coalesce_input.equivalence_properties(),
+                ) {
+                    return add_sort_above_child(&coalesce_input, 
sort_expr.to_vec());
+                }
+            }
+            coalesce_input

Review Comment:
   I believe most of the time the global `SortExec` + `CoalescePartitionsExec` 
are adjacent nodes.  But is it possible that there will be some Projections 
between the `SortExec` and `CoalescePartitionsExec`  which will make the 
SortExec has totally different exprs/columns 
   with the `CoalescePartitionsExec's` input plan ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to