alamb commented on code in PR #14907:
URL: https://github.com/apache/datafusion/pull/14907#discussion_r1973415233


##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -138,29 +138,82 @@ fn update_sort_ctx_children_data(
 /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The 
data
 /// attribute stores whether the plan is a `CoalescePartitionsExec` or is
 /// connected to a `CoalescePartitionsExec` via its children.
+///
+/// The tracker halts at each [`SortExec`] (where the SPM will act to replace 
the coalesce).
+///
+/// This requires a bottom-up traversal was previously performed, updating the
+/// children previously.
 pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
 
+/// Determines if the coalesce may be safely removed.
+fn is_coalesce_to_remove(
+    node: &Arc<dyn ExecutionPlan>,
+    parent: &Arc<dyn ExecutionPlan>,
+) -> bool {
+    let parent_req_single_partition = matches!(
+        parent.required_input_distribution()[0],
+        Distribution::SinglePartition
+    );
+
+    is_coalesce_partitions(node)
+        && (
+            // node above does not require single distribution
+            !parent_req_single_partition
+        // it doesn't immediately repartition
+        || is_repartition(parent)
+        // any adjacent Coalesce->Sort can be replaced
+        || is_sort(parent)
+        )
+}
+
+/// Discovers the linked Coalesce->Sort cascades.
+///
+/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively
+/// remove the linked coalesces in the subplan. Then afterwards, an SPM is 
added
+/// at the root of the subplan (just after the sort) in order to parallelize 
sorts.
+/// Refer to the [`parallelize_sorts`] for more details on sort 
parallelization.
+///
+/// Example of linked Coalesce->Sort:
+/// ```text
+/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
+///   ...nodes...   ctx.data=true (e.g. are linked in cascade)
+///     Coalesce  ctx.data=true (e.g. is a coalesce)
+/// ```
+///
+/// The link should not be continued (and the coalesce not removed) if the 
distribution
+/// is changed between the Coalesce->Sort cascade. Example:
+/// ```text
+/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
+///   AggregateExec  ctx.data=false, to stop the link
+///     ...nodes...   ctx.data=true (e.g. are linked in cascade)
+///       Coalesce  ctx.data=true (e.g. is a coalesce)
+/// ```
 fn update_coalesce_ctx_children(
     coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
 ) {
-    let children = &coalesce_context.children;
-    coalesce_context.data = if children.is_empty() {
-        // Plan has no children, it cannot be a `CoalescePartitionsExec`.
-        false
-    } else if is_coalesce_partitions(&coalesce_context.plan) {
-        // Initiate a connection:
-        true
-    } else {
-        children.iter().enumerate().any(|(idx, node)| {
-            // Only consider operators that don't require a single partition,
-            // and connected to some `CoalescePartitionsExec`:
-            node.data
-                && !matches!(
-                    coalesce_context.plan.required_input_distribution()[idx],
-                    Distribution::SinglePartition
-                )
-        })
-    };
+    // perform lookahead(1) during bottom up traversal
+    // since we are checking distribution requirements after the coalesce 
occurs
+    let parent = &coalesce_context.plan;
+
+    for child_context in coalesce_context.children.iter_mut() {

Review Comment:
   this method used to check `is_coalesce_partitions` once before iterating 
through the children -- this PR now checks for each child.
   
   As written this loop seems to update `child_context.data` multiple times 
(once in each loop body)
   
   I think it is likely the same given the coalesce partitions has a single 
child, but I found the previous implementation easier to understand



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -316,25 +369,43 @@ fn replace_with_partial_sort(
 /// are transformed into
 /// ```text
 ///      "SortPreservingMergeExec: \[a@0 ASC\]",
-///      "  ...nodes..."
-///      "    SortExec: expr=\[a@0 ASC\]",
+///      "  SortExec: expr=\[a@0 ASC\]",
+///      "    ...nodes..."
+///      "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+/// ```
+/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
+/// By performing sorting in parallel, we can increase performance in some 
scenarios.
+///
+/// This requires that there are no nodes between the [`SortExec`] and 
[`CoalescePartitionsExec`]
+/// which require single partitioning. Do not parallelize when the following 
scenario occurs:
+/// ```text
+///      "SortExec: expr=\[a@0 ASC\]",
+///      "  ...nodes requiring single partitioning..."
+///      "    CoalescePartitionsExec",
 ///      "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
 /// ```
 pub fn parallelize_sorts(
     mut requirements: PlanWithCorrespondingCoalescePartitions,
 ) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
+    requirements = requirements.update_plan_from_children()?;
     update_coalesce_ctx_children(&mut requirements);
+    let coalesce_can_be_removed = requirements.children.iter().any(|child| 
child.data);

Review Comment:
   this variable is called `coalesce_can_be_removed` but the comments above say 
that `.data` means:
   
   > The data
   > /// attribute stores whether the plan is a `CoalescePartitionsExec` or is
   > /// connected to a `CoalescePartitionsExec` via its children.
   
   
   This seems inconsistent to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to