avantgardnerio opened a new pull request, #23167:
URL: https://github.com/apache/datafusion/pull/23167

   **Status: PoC/proposal, not for merge.** Drafted to frame the design and get 
feedback from the community before any larger investment.
   
   ## Motivation
   
   DataFusion lacks a story for adaptive query execution — runtime-informed 
re-optimization of a plan based on stats only knowable mid-execution (true 
post-aggregate cardinality, sort extrema, actual join input sizes, partition 
skew, etc.). Spark added AQE in 3.0 by materializing shuffle outputs to disk 
between stages and re-entering the optimizer; that model is a poor fit for 
DataFusion because DataFusion is in-process and streaming. Materializing 
intermediate results between stages would impose Spark-style overhead on a 
system that doesn't have Spark-style network shuffle costs to amortize.
   
   This PR proposes a streaming-native AQE model: insert lightweight 
synchronization wrappers above pipeline-breaking operators (sort, aggregate); 
use a shared coordinator wrapper at the plan root that runs rules at the 
natural barrier points; rules observe runtime stats from already-materialized 
in-memory state and mutate adaptive operators in place.
   
   No materialization to disk. No re-planning pass. No extra buffering beyond 
what pipeline breakers already do internally.
   
   ## Components
   
   ### `PipelineBreakerBuffer`
   
   Sync wrapper inserted above each pipeline-breaking operator (currently 
`AggregateExec`, `SortExec`). Holds one batch per input partition, signals 
`is_ready` when all partitions have produced their first poll result (or 
terminated empty). State machine:
   
   ```
   NeedFirstBatch  →  WaitForStreaming  →  EmitHeld  →  Streaming
                          (held)            (release)    (forward)
   ```
   
   Three flags:
   - `is_ready` (mechanical): every input partition has produced its first poll 
result.
   - `streaming_enabled` (rule-controllable proposal): reset each poll cycle by 
RTO — set to `is_ready` as the permissive default. Rules can flip to false to 
veto.
   - `streaming_started` (actual emission control): only RTO flips this via 
`start_streaming()`, which also wakes per-partition wakers.
   
   ### `RuntimeOptimizerExec`
   
   Inserted at the plan root by a new `InsertRuntimeOptimizer` physical 
optimizer rule. On every `poll_next`, runs a three-phase coordinator cycle:
   
   1. **Propose**: walk the subtree; set `streaming_enabled = is_ready` on each 
buffer (permissive default).
   2. **Evaluate**: run each registered `RuntimeRule`. Rules may flip 
`streaming_enabled = false` (veto), mutate adaptive operators in place, or both.
   3. **Commit**: walk the subtree; for each buffer still enabled, call 
`start_streaming()`.
   
   A shared `Arc<AtomicWaker>` is threaded into both RTO and every buffer. 
Buffers wake it when their `is_ready` flips; RTO registers `cx.waker()` on it 
before each walk. The `AtomicWaker` is essential because `cx.waker()` is 
task-local — a buffer's poll inside a spawned subtask (e.g. one of 
`RepartitionExec`'s internals) cannot wake RTO via `cx.waker()`, but can via 
the shared `AtomicWaker`.
   
   ### `RuntimeRule` trait
   
   ```rust
   pub trait RuntimeRule: Send + Sync + std::fmt::Debug {
       fn name(&self) -> &str;
       fn evaluate(&self, plan: &Arc<dyn ExecutionPlan>);
   }
   ```
   
   Cheap, idempotent visitors. Rules track their own \"already fired\" state. 
They observe runtime stats via new trait methods like `runtime_row_count` (this 
PR) and (future) `runtime_partition_extrema` (analogous to 
apache/datafusion#23090), and mutate adaptive operators via typed methods 
(`HashJoinExec::flip_sides`, `RepartitionExec::set_split_points`, etc. — all 
future work).
   
   ### `ExecutionPlan::runtime_row_count(partition)`
   
   New trait method (default `None`) returning the number of rows an operator 
*will emit* on a given partition. For pipeline-breaking operators, the value is 
knowable the moment their build phase completes — *before* any output batch is 
pulled. Implemented on `AggregateExec` here, reading from the existing 
`OutputRows` metric.
   
   ### Example rule: `SwapBuildSideIfInverted`
   
   First concrete `RuntimeRule`. Walks the plan looking for `HashJoinExec`. For 
each, compares its left (build) and right (probe) sides' row counts: static 
`statistics()` when Exact (e.g. an in-memory dimension table), runtime via 
`runtime_row_count` summed across partitions otherwise (e.g. the output of a 
`GROUP BY` whose distinct cardinality the planner couldn't predict).
   
   If the build side is larger than the probe side, the rule would call 
`HashJoinExec::flip_sides()`. **But that method doesn't exist yet**, so this PR 
logs intent only (`RUST_LOG=info` visible). The detection mechanism is fully 
wired end-to-end; the actually-saving-work part needs HashJoin to defer its 
build phase until both sides' pipeline-breakers report `is_ready` — a follow-up 
operator change.
   
   ## SLT walkthrough
   
   `runtime_optimizer.slt` sets up the canonical misestimation case: `big` 
(100K rows) joined to `small` (100 rows) through a GROUP BY on a 
low-cardinality column the planner can't propagate distinct stats through. 
JoinSelection trusts the Inexact ~100K estimate for aggregated_big and picks 
small as the build side. At runtime aggregated_big is only 5 distinct groups; 
the rule detects the inversion and would flip.
   
   The EXPLAIN block in the SLT shows the static plan with the AQE primitives 
wrapped — `RuntimeOptimizerExec` at root, `PipelineBreakerBuffer` above each 
AggregateExec. The build-side swap is *not* reflected in EXPLAIN because the 
proposed runtime mutation cannot retroactively appear in static plan output.
   
   ## What lands here vs. what's deferred
   
   Lands:
   - `PipelineBreakerBuffer` operator + `is_pipeline_breaker` heuristic + 
insertion rule.
   - `RuntimeOptimizerExec` operator + insertion rule + shared AtomicWaker wake 
propagation.
   - `RuntimeRule` trait + three-phase coordinator (propose / evaluate / 
commit).
   - `ExecutionPlan::runtime_row_count` trait method.
   - `AggregateExec` impl of `runtime_row_count`.
   - `SwapBuildSideIfInverted` rule (log-only).
   - SLT demonstrating the detection end-to-end.
   
   Deferred to follow-ups (each its own PR-sized piece of work):
   
   1. **`HashJoinExec::flip_sides()`** with interior mutability + a build-phase 
deferral mechanism that waits for descendant `PipelineBreakerBuffer`s to be 
ready before starting build. This is the change that turns 
`SwapBuildSideIfInverted` from a logger into a real optimization.
   2. **`RepartitionExec::set_split_points()`** — mutable 
`Partitioning::Range(Option<Vec<SplitPoint>>)`. Pairs with 
`runtime_partition_extrema` on SortExec to enable parallel cumulative-window 
functions and parallel range repartitioning in general. Mutating 
RepartitionExec is *timing-safe* (it has no synchronous build phase, just 
routing), so this follow-up doesn't need operator deferral.
   3. Additional rules: partition coalescing, skew splitting, adaptive 
aggregation strategy, broadcast-join switching.
   4. `runtime_partition_extrema` trait method on SortExec, parallel-window 
operators (`CarryExec`, `HaloDropExec`) — there's already a parallel PoC at 
apache/datafusion#23026 / #23090 / #23094 that overlaps significantly; this 
work would either build on or replace those primitives.
   
   ## Discussion points I'd appreciate feedback on
   
   - **Architectural model.** Is the propose/evaluate/commit cycle the right 
shape, or should rules return decisions to RTO instead of mutating buffer state 
directly?
   - **Pipeline-breaker detection.** Currently a hardcoded match against 
`AggregateExec` and `SortExec`. Should there be a trait method like 
`is_pipeline_breaker()`?
   - **Trait surface.** `runtime_row_count(partition: usize)` returns total per 
partition. Is per-partition vs. cross-partition total the right primitive?
   - **Wake propagation via AtomicWaker.** Is the shared waker pattern 
acceptable, or is there a more idiomatic approach DataFusion already uses for 
cross-task notification I'm missing?
   - **Naming.** `RuntimeOptimizerExec` vs `AdaptiveExec` vs something else? 
Open to bikeshedding.
   
   ## What this PoC is not
   
   - Not a complete AQE implementation. The log-only rule means no actual work 
is saved yet.
   - Not a replacement for the existing parallel-window-poc work — that work 
targets the same general space (runtime-informed plan adaptation) but via 
different primitives (the `runtime_partition_extrema` API + new operators). The 
two designs are compatible and could converge.
   - Not optimized. The on-every-poll subtree walks are O(plan depth); fine for 
the prototype, would want amortization in production.
   
   Looking for: \"is this the right architectural direction for AQE in 
DataFusion?\" before investing in any of the follow-up work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to