wiedld commented on code in PR #14907: URL: https://github.com/apache/datafusion/pull/14907#discussion_r1974080657
########## datafusion/physical-optimizer/src/enforce_sorting/mod.rs: ########## @@ -316,25 +369,43 @@ fn replace_with_partial_sort( /// are transformed into /// ```text /// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " ...nodes..." -/// " SortExec: expr=\[a@0 ASC\]", +/// " SortExec: expr=\[a@0 ASC\]", +/// " ...nodes..." +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. +/// By performing sorting in parallel, we can increase performance in some scenarios. +/// +/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`] +/// which require single partitioning. Do not parallelize when the following scenario occurs: +/// ```text +/// "SortExec: expr=\[a@0 ASC\]", +/// " ...nodes requiring single partitioning..." +/// " CoalescePartitionsExec", /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` pub fn parallelize_sorts( mut requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> { + requirements = requirements.update_plan_from_children()?; update_coalesce_ctx_children(&mut requirements); + let coalesce_can_be_removed = requirements.children.iter().any(|child| child.data); Review Comment: I switched it to be docs only: https://github.com/apache/datafusion/pull/14907#issuecomment-2688656333 Then if I move forward with another refactoring attempt, I'll first make an issue to document how exactly the current (on main) `update_coalesce_ctx_children` does not follow the docs. ########## datafusion/physical-optimizer/src/enforce_sorting/mod.rs: ########## @@ -138,29 +138,82 @@ fn update_sort_ctx_children_data( /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data /// attribute stores whether the plan is a `CoalescePartitionsExec` or is /// connected to a `CoalescePartitionsExec` via its children. +/// +/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce). +/// +/// This requires a bottom-up traversal was previously performed, updating the +/// children previously. pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>; +/// Determines if the coalesce may be safely removed. +fn is_coalesce_to_remove( + node: &Arc<dyn ExecutionPlan>, + parent: &Arc<dyn ExecutionPlan>, +) -> bool { + let parent_req_single_partition = matches!( + parent.required_input_distribution()[0], + Distribution::SinglePartition + ); + + is_coalesce_partitions(node) + && ( + // node above does not require single distribution + !parent_req_single_partition + // it doesn't immediately repartition + || is_repartition(parent) + // any adjacent Coalesce->Sort can be replaced + || is_sort(parent) + ) +} + +/// Discovers the linked Coalesce->Sort cascades. +/// +/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively +/// remove the linked coalesces in the subplan. Then afterwards, an SPM is added +/// at the root of the subplan (just after the sort) in order to parallelize sorts. +/// Refer to the [`parallelize_sorts`] for more details on sort parallelization. +/// +/// Example of linked Coalesce->Sort: +/// ```text +/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan) +/// ...nodes... ctx.data=true (e.g. are linked in cascade) +/// Coalesce ctx.data=true (e.g. is a coalesce) +/// ``` +/// +/// The link should not be continued (and the coalesce not removed) if the distribution +/// is changed between the Coalesce->Sort cascade. Example: +/// ```text +/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan) +/// AggregateExec ctx.data=false, to stop the link +/// ...nodes... ctx.data=true (e.g. are linked in cascade) +/// Coalesce ctx.data=true (e.g. is a coalesce) +/// ``` fn update_coalesce_ctx_children( coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, ) { - let children = &coalesce_context.children; - coalesce_context.data = if children.is_empty() { - // Plan has no children, it cannot be a `CoalescePartitionsExec`. - false - } else if is_coalesce_partitions(&coalesce_context.plan) { - // Initiate a connection: - true - } else { - children.iter().enumerate().any(|(idx, node)| { - // Only consider operators that don't require a single partition, - // and connected to some `CoalescePartitionsExec`: - node.data - && !matches!( - coalesce_context.plan.required_input_distribution()[idx], - Distribution::SinglePartition - ) - }) - }; + // perform lookahead(1) during bottom up traversal + // since we are checking distribution requirements after the coalesce occurs + let parent = &coalesce_context.plan; + + for child_context in coalesce_context.children.iter_mut() { Review Comment: See https://github.com/apache/datafusion/pull/14907#discussion_r1974080657 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org