alamb commented on code in PR #14673:
URL: https://github.com/apache/datafusion/pull/14673#discussion_r1956781210
##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
};
}
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`]
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
Review Comment:
π
##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
};
}
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`]
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.
+///
+/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// combine the partitions first, and then sort:
+///
+/// β β β β β β β
+/// βββ¬ββ¬ββ
+/// ββBβAβDβ... ββββ
+/// βββ΄ββ΄ββ β
+/// β β β β β β β β ββββββββββββββββββββββββββ β β β β β β β β
ββββββββββ β β β β β β β β β
+/// Partition 1 β β Coalesce β βββ¬ββ¬ββ¬ββ¬ββ β
β βββ¬ββ¬ββ¬ββ¬ββ
+/// ββββΆ(no ordering guarantees)ββββΆββBβEβAβDβCβ...ββββΆ Sort
βββββΆββAβBβCβDβEβ... β
+/// β β β βββ΄ββ΄ββ΄ββ΄ββ β
β βββ΄ββ΄ββ΄ββ΄ββ
+/// β β β β β β β β ββββββββββββββββββββββββββ β β β β β β β β
ββββββββββ β β β β β β β β β
+/// βββ¬ββ β Partition
Partition
+/// ββEβCβ ... ββββ
+/// βββ΄ββ
+/// β β β β β β β
+/// Partition 2
+///
Review Comment:
```suggestion
///
/// ```
```
##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
};
}
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`]
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.
+///
+/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// combine the partitions first, and then sort:
+///
+/// β β β β β β β
Review Comment:
If you add the backticks this will render more nicely in the output of
`cargo doc`:
```suggestion
/// ```text
/// β β β β β β β
```
##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
};
}
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`]
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
Review Comment:
This is greaet -- thank you @wiedld
I am not sure it will be rendered as a doc if it is listed on the `impl` --
perhaps it would be better here:
https://github.com/apache/datafusion/blob/a104661a020b895eb155af12575bafe693b8edaf/datafusion/physical-optimizer/src/enforce_sorting/mod.rs#L129-L128
##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
};
}
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`]
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.
+///
+/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// combine the partitions first, and then sort:
+///
+/// β β β β β β β
+/// βββ¬ββ¬ββ
+/// ββBβAβDβ... ββββ
+/// βββ΄ββ΄ββ β
+/// β β β β β β β β ββββββββββββββββββββββββββ β β β β β β β β
ββββββββββ β β β β β β β β β
+/// Partition 1 β β Coalesce β βββ¬ββ¬ββ¬ββ¬ββ β
β βββ¬ββ¬ββ¬ββ¬ββ
+/// ββββΆ(no ordering guarantees)ββββΆββBβEβAβDβCβ...ββββΆ Sort
βββββΆββAβBβCβDβEβ... β
+/// β β β βββ΄ββ΄ββ΄ββ΄ββ β
β βββ΄ββ΄ββ΄ββ΄ββ
+/// β β β β β β β β ββββββββββββββββββββββββββ β β β β β β β β
ββββββββββ β β β β β β β β β
+/// βββ¬ββ β Partition
Partition
+/// ββEβCβ ... ββββ
+/// βββ΄ββ
+/// β β β β β β β
+/// Partition 2
+///
+///
+///
+/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades
+/// sorts each partition first, then merge partitions while retaining the sort:
+///
+/// β β β β β β β ββββββββββ β β β β β β β
Review Comment:
```suggestion
/// ```
/// β β β β β β β ββββββββββ β β β β β β β
```
##########
datafusion/physical-plan/src/tree_node.rs:
##########
@@ -39,8 +39,14 @@ impl DynTreeNode for dyn ExecutionPlan {
}
}
-/// A node object beneficial for writing optimizer rules, encapsulating an
[`ExecutionPlan`] node with a payload.
-/// Since there are two ways to access child plansβdirectly from the plan and
through child nodesβit's recommended
+/// A node context object beneficial for writing optimizer rules.
+/// This context encapsulating an [`ExecutionPlan`] node with a payload.
+///
+/// Since each wrapped node has it's children within both the
[`PlanContext.plan.children()`],
Review Comment:
π
this is indeed quite subtle -- it would be awesome if we could find some way
to make it harder to forget to call `update_plan_from_children`
##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
};
}
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`]
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.
Review Comment:
I found cascades a bit confusing at first. Maybe something like this would
be clearer:
```suggestion
/// then transform [`CoalescePartitionsExec`] + [`SortExec`]
/// into [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below
```
##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -84,42 +84,56 @@ impl EnforceSorting {
}
}
-/// This object is used within the [`EnforceSorting`] rule to track the closest
+/// This context object is used within the [`EnforceSorting`] rule to track
the closest
/// [`SortExec`] descendant(s) for every child of a plan. The data attribute
/// stores whether the plan is a `SortExec` or is connected to a `SortExec`
/// via its children.
pub type PlanWithCorrespondingSort = PlanContext<bool>;
+/// For a given node, update the [`PlanContext.data`] attribute.
+///
+/// If the node is a `SortExec`, or any of the node's children are a
`SortExec`,
+/// then set the attribute to true.
+///
+/// This requires a bottom-up traversal was previously performed, updating the
+/// children previously.
fn update_sort_ctx_children(
- mut node: PlanWithCorrespondingSort,
+ mut node_and_ctx: PlanWithCorrespondingSort,
data: bool,
) -> Result<PlanWithCorrespondingSort> {
- for child_node in node.children.iter_mut() {
- let plan = &child_node.plan;
- child_node.data = if is_sort(plan) {
- // Initiate connection:
+ // Update `child.data` for all children.
+ for child_node in node_and_ctx.children.iter_mut() {
+ let child_plan = &child_node.plan;
+ child_node.data = if is_sort(child_plan) {
+ // child is sort
true
- } else if is_limit(plan) {
+ } else if is_limit(child_plan) {
// There is no sort linkage for this path, it starts at a limit.
false
} else {
- let is_spm = is_sort_preserving_merge(plan);
- let required_orderings = plan.required_input_ordering();
- let flags = plan.maintains_input_order();
+ // If a descendent is a sort, and the child maintains the sort.
+ let is_spm = is_sort_preserving_merge(child_plan);
+ let required_orderings = child_plan.required_input_ordering();
+ let flags = child_plan.maintains_input_order();
// Add parent node to the tree if there is at least one child with
// a sort connection:
izip!(flags, required_orderings).any(|(maintains,
required_ordering)| {
let propagates_ordering =
(maintains && required_ordering.is_none()) || is_spm;
+ // `connected_to_sort` only returns the correct answer with
bottom-up traversal
let connected_to_sort =
child_node.children.iter().any(|child| child.data);
propagates_ordering && connected_to_sort
})
}
}
- node.data = data;
- node.update_plan_from_children()
+ // set data attribute on current node
+ node_and_ctx.data = data;
+
+ // TODO(xudong963): the plans are not mutated, only the `data` attribute
is set.
+ // Therefore this should be called before this function.
+ node_and_ctx.update_plan_from_children()
Review Comment:
FYI maybe we could do something like I suggest here
- https://github.com/apache/datafusion/pull/14650#discussion_r1956787730
To avoid having to remember to call `update_plan_from_children` as much
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]