This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e292f33f2d perf(physical-optimizer): skip ensure_distribution rebuild 
when children are unchanged (#22521)
e292f33f2d is described below

commit e292f33f2db7b1fdf64294932d6f29a879c5d8ff
Author: Qi Zhu <[email protected]>
AuthorDate: Thu May 28 10:57:00 2026 +0800

    perf(physical-optimizer): skip ensure_distribution rebuild when children 
are unchanged (#22521)
    
    ## Which issue does this PR close?
    
    - Closes #22520.
    
    ## Rationale for this change
    
    `ensure_distribution` in
    
`datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs`
    unconditionally calls `plan.with_new_children(children_plans)` after
    collecting the (possibly redistributed) children, even when none of
    those children were actually replaced. For nodes like `ProjectionExec`,
    that path runs through `try_new` and recomputes the schema, equivalence
    properties, output ordering, and output partitioning, then allocates a
    new `Arc<dyn ExecutionPlan>`. When every child Arc is pointer-identical
    to the input, that work produces a logically identical node — pure
    overhead.
    
    The cost is amplified by two factors:
    
    1. **Plan depth.** Workloads dominated by point queries (no join /
    aggregate / unmet ordering — i.e. nothing for `ensure_distribution` to
    inject a `RepartitionExec` or `SortExec` for) hit this wasted rebuild at
    every node in the plan. A 5–30 deep `ProjectionExec` stack pays the cost
    N times.
    2. **Schema width.** Most steps inside `ProjectionExec::try_new` are
    `O(num_columns)`: per-column `data_type` / `nullable` lookup to build
    the new schema, per-column remapping of equivalence classes through the
    projection mapping, and per-column lookup when rewriting
    `PhysicalSortExpr`s into the output ordering. Wide schemas (tens of
    columns) make every wasted rebuild proportionally heavier.
    
    Profiling a production point-query workload (wide schemas, deep
    `ProjectionExec` stacks) showed `ProjectionExec::with_new_children` as
    the single largest cost inside `ensure_distribution`:
    
    - `ensure_distribution` total: 2.87s of a 60s CPU sample
    - `ProjectionExec::with_new_children`: 1.94s (56% of the rule)
    - `SortExec::with_new_children`: 0.11s
    - Other ExecutionPlan nodes: 0.82s
    
    ## What changes are included in this PR?
    
    After collecting `children_plans`, compare each new child Arc with the
    original via `Arc::ptr_eq`. When every child is unchanged, reuse the
    existing `plan` Arc and skip `with_new_children`. The `UnionExec` to
    `InterleaveExec` special case still runs first because it intentionally
    produces a new node even when child Arcs are unchanged.
    
    This relies on the fact that `ensure_distribution` already produces
    pointer-identical Arcs for children that need no redistribution (it
    threads the original Arc through unchanged), so `Arc::ptr_eq` precisely
    distinguishes "rewritten" from "untouched" children at O(1) per child.
    
    ## Are these changes tested?
    
    Yes. The existing `enforce_distribution` suite passes unchanged (66/66):
    
    ```
    cargo test --release -p datafusion --test core_integration -- 
physical_optimizer::enforce_distribution
    ```
    
    The behavior is observable only as a CPU reduction; correctness is
    preserved because `ExecutionPlan` nodes are immutable, so reusing the
    original Arc produces the same plan tree as
    `with_new_children(unchanged_children)` would have, just without the
    schema / ordering / equivalence / partitioning recomputation.
    
    ## Are there any user-facing changes?
    
    No. Same plans, lower planning time.
    
    ## Micro-benchmark
    
    Plan shape: 30-deep `ProjectionExec` stack over a sorted parquet scan,
    5000 iterations.
    
    - Without fix: 852.74 ms total, 170.55 us/call
    - With fix:    296.81 ms total, 59.36 us/call
    - ~2.87x speedup, -65% CPU per call
    
    Wider schemas (more projection expressions per node) widen the gap
    further because each skipped `with_new_children` avoids more
    O(num_columns) work.
---
 .../physical_optimizer/enforce_distribution.rs     | 35 ++++++++++++++++++++++
 .../ensure_requirements/enforce_distribution.rs    | 15 ++++++++--
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index fb11657107..426e1fa745 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -3971,3 +3971,38 @@ fn 
adjust_input_keys_ordering_no_transform_for_filter_scan() -> Result<()> {
     );
     Ok(())
 }
+
+/// Verifies the `ensure_distribution` fast path: when no child of a node is
+/// replaced (no `RepartitionExec` or `SortExec` injection is required),
+/// the rule must reuse the input `Arc<dyn ExecutionPlan>` unchanged instead
+/// of calling `with_new_children`. For a deep `ProjectionExec` chain over a
+/// single-partition scan with `target_partitions = 1`, every node hits this
+/// fast path, so the root returned by `ensure_distribution` must be the
+/// same `Arc` as the input.
+///
+/// Regression test for the optimization that avoids
+/// `ProjectionExec::with_new_children` (which recomputes schema, equivalence
+/// properties, output ordering, and partitioning) on the common point-query
+/// plan shape.
+#[test]
+fn ensure_distribution_reuses_plan_arc_when_no_redistribution_needed() -> 
Result<()> {
+    let scan = parquet_exec();
+    let proj1 = projection_exec_with_alias(
+        scan,
+        vec![
+            ("a".to_string(), "a".to_string()),
+            ("b".to_string(), "b".to_string()),
+        ],
+    );
+    let proj2 =
+        projection_exec_with_alias(proj1, vec![("a".to_string(), 
"a".to_string())]);
+    let plan: Arc<dyn ExecutionPlan> = proj2;
+
+    let result = ensure_distribution_helper(Arc::clone(&plan), 1, false)?;
+
+    assert!(
+        Arc::ptr_eq(&result, &plan),
+        "ensure_distribution must reuse the input Arc when no children require 
redistribution"
+    );
+    Ok(())
+}
diff --git 
a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs 
b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs
index 093a1ec14b..ada7b6d741 100644
--- 
a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs
+++ 
b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs
@@ -65,7 +65,9 @@ use datafusion_physical_plan::tree_node::PlanContext;
 use datafusion_physical_plan::union::{InterleaveExec, UnionExec, 
can_interleave};
 use datafusion_physical_plan::windows::WindowAggExec;
 use datafusion_physical_plan::windows::{BoundedWindowAggExec, 
get_best_fitting_window};
-use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
+use datafusion_physical_plan::{
+    Distribution, ExecutionPlan, Partitioning, with_new_children_if_necessary,
+};
 
 use itertools::izip;
 
@@ -1362,7 +1364,16 @@ pub fn ensure_distribution(
         //           Data
         Arc::new(InterleaveExec::try_new(children_plans)?)
     } else {
-        plan.with_new_children(children_plans)?
+        // Route through `with_new_children_if_necessary` so the common
+        // case where no child was replaced above skips the expensive
+        // `with_new_children` rebuild. For nodes like `ProjectionExec`,
+        // `with_new_children` recomputes schema / equivalence properties /
+        // output ordering via `try_new` even when the input Arcs are
+        // identical, which dominates `ensure_distribution` time on deep
+        // projection stacks over plans where no distribution change
+        // applies (point queries with no join / aggregate / unmet
+        // ordering).
+        with_new_children_if_necessary(plan, children_plans)?
     };
 
     Ok(Transformed::yes(DistributionContext::new(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to