wiedld commented on code in PR #14907:
URL: https://github.com/apache/datafusion/pull/14907#discussion_r1974080657


##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -316,25 +369,43 @@ fn replace_with_partial_sort(
 /// are transformed into
 /// ```text
 ///      "SortPreservingMergeExec: \[a@0 ASC\]",
-///      "  ...nodes..."
-///      "    SortExec: expr=\[a@0 ASC\]",
+///      "  SortExec: expr=\[a@0 ASC\]",
+///      "    ...nodes..."
+///      "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+/// ```
+/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
+/// By performing sorting in parallel, we can increase performance in some 
scenarios.
+///
+/// This requires that there are no nodes between the [`SortExec`] and 
[`CoalescePartitionsExec`]
+/// which require single partitioning. Do not parallelize when the following 
scenario occurs:
+/// ```text
+///      "SortExec: expr=\[a@0 ASC\]",
+///      "  ...nodes requiring single partitioning..."
+///      "    CoalescePartitionsExec",
 ///      "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
 /// ```
 pub fn parallelize_sorts(
     mut requirements: PlanWithCorrespondingCoalescePartitions,
 ) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
+    requirements = requirements.update_plan_from_children()?;
     update_coalesce_ctx_children(&mut requirements);
+    let coalesce_can_be_removed = requirements.children.iter().any(|child| 
child.data);

Review Comment:
   I switched it to be docs only: 
https://github.com/apache/datafusion/pull/14907#issuecomment-2688656333
   Then if I move forward with another refactoring attempt, I'll first make an 
issue to document how exactly the current (on main) 
`update_coalesce_ctx_children` does not follow the docs.



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -138,29 +138,82 @@ fn update_sort_ctx_children_data(
 /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The 
data
 /// attribute stores whether the plan is a `CoalescePartitionsExec` or is
 /// connected to a `CoalescePartitionsExec` via its children.
+///
+/// The tracker halts at each [`SortExec`] (where the SPM will act to replace 
the coalesce).
+///
+/// This requires a bottom-up traversal was previously performed, updating the
+/// children previously.
 pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
 
+/// Determines if the coalesce may be safely removed.
+fn is_coalesce_to_remove(
+    node: &Arc<dyn ExecutionPlan>,
+    parent: &Arc<dyn ExecutionPlan>,
+) -> bool {
+    let parent_req_single_partition = matches!(
+        parent.required_input_distribution()[0],
+        Distribution::SinglePartition
+    );
+
+    is_coalesce_partitions(node)
+        && (
+            // node above does not require single distribution
+            !parent_req_single_partition
+        // it doesn't immediately repartition
+        || is_repartition(parent)
+        // any adjacent Coalesce->Sort can be replaced
+        || is_sort(parent)
+        )
+}
+
+/// Discovers the linked Coalesce->Sort cascades.
+///
+/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively
+/// remove the linked coalesces in the subplan. Then afterwards, an SPM is 
added
+/// at the root of the subplan (just after the sort) in order to parallelize 
sorts.
+/// Refer to the [`parallelize_sorts`] for more details on sort 
parallelization.
+///
+/// Example of linked Coalesce->Sort:
+/// ```text
+/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
+///   ...nodes...   ctx.data=true (e.g. are linked in cascade)
+///     Coalesce  ctx.data=true (e.g. is a coalesce)
+/// ```
+///
+/// The link should not be continued (and the coalesce not removed) if the 
distribution
+/// is changed between the Coalesce->Sort cascade. Example:
+/// ```text
+/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
+///   AggregateExec  ctx.data=false, to stop the link
+///     ...nodes...   ctx.data=true (e.g. are linked in cascade)
+///       Coalesce  ctx.data=true (e.g. is a coalesce)
+/// ```
 fn update_coalesce_ctx_children(
     coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
 ) {
-    let children = &coalesce_context.children;
-    coalesce_context.data = if children.is_empty() {
-        // Plan has no children, it cannot be a `CoalescePartitionsExec`.
-        false
-    } else if is_coalesce_partitions(&coalesce_context.plan) {
-        // Initiate a connection:
-        true
-    } else {
-        children.iter().enumerate().any(|(idx, node)| {
-            // Only consider operators that don't require a single partition,
-            // and connected to some `CoalescePartitionsExec`:
-            node.data
-                && !matches!(
-                    coalesce_context.plan.required_input_distribution()[idx],
-                    Distribution::SinglePartition
-                )
-        })
-    };
+    // perform lookahead(1) during bottom up traversal
+    // since we are checking distribution requirements after the coalesce 
occurs
+    let parent = &coalesce_context.plan;
+
+    for child_context in coalesce_context.children.iter_mut() {

Review Comment:
   See https://github.com/apache/datafusion/pull/14907#discussion_r1974080657



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to