mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099922769


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces 
ordering). This tree collects
+                    // all the intermediate executors that maintain this 
ordering. If
+                    // we just saw a sort-introducing operator, we reset the 
tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts 
at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, 
required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains 
its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) 
|| is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, 
children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+

Review Comment:
   There is heavy logic in the `map_children()` method. Can we move the heavy 
logic to other places like the move to the `transform()`. Usually 
`map_children()` just apply the `transform` logic to all the `children` and 
construct new children.  We can also have a new `init()` method for 
`PlanWithCorrespondingSort` and handle the construction of `sort_onwards` from 
the current plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to