mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099970375


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -184,225 +431,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if 
there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = 
sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question 
already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, 
sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when 
we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = 
analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering 
information propagates with
-                    //       necessary lineage information, compare 
`sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) 
-> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec 
to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a 
order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, 
count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = 
requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| 
e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }

Review Comment:
   Could you please explain the purpose of these lines of code ? The original 
code just remove the `SortExec`.  Looks like the new change try to handle a 
case that the current `SortExec`  is a global Sort and the  sort_input is 
actually `local Sort`, instead of removing the `global Sort`, replace it with a 
 `SortPreservingMergeExec`.  But I think we should not see such physical plan 
tree. This is because for a global Sort,  after `EnforceDistribution` rule, a 
`CoalescePartitionsExec` will be added as the input of that global Sort, and 
`CoalescePartitionsExec`  can not propagate any sort properties. the 
`ordering_satisfy` check will become false. So I think we do not need specific 
handling for global Sort here. Please correct me if I am wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to