avantgardnerio opened a new pull request, #23167:
URL: https://github.com/apache/datafusion/pull/23167
**Status: PoC/proposal, not for merge.** Drafted to frame the design and get
feedback from the community before any larger investment.
## Motivation
DataFusion lacks a story for adaptive query execution — runtime-informed
re-optimization of a plan based on stats only knowable mid-execution (true
post-aggregate cardinality, sort extrema, actual join input sizes, partition
skew, etc.). Spark added AQE in 3.0 by materializing shuffle outputs to disk
between stages and re-entering the optimizer; that model is a poor fit for
DataFusion because DataFusion is in-process and streaming. Materializing
intermediate results between stages would impose Spark-style overhead on a
system that doesn't have Spark-style network shuffle costs to amortize.
This PR proposes a streaming-native AQE model: insert lightweight
synchronization wrappers above pipeline-breaking operators (sort, aggregate);
use a shared coordinator wrapper at the plan root that runs rules at the
natural barrier points; rules observe runtime stats from already-materialized
in-memory state and mutate adaptive operators in place.
No materialization to disk. No re-planning pass. No extra buffering beyond
what pipeline breakers already do internally.
## Components
### `PipelineBreakerBuffer`
Sync wrapper inserted above each pipeline-breaking operator (currently
`AggregateExec`, `SortExec`). Holds one batch per input partition, signals
`is_ready` when all partitions have produced their first poll result (or
terminated empty). State machine:
```
NeedFirstBatch → WaitForStreaming → EmitHeld → Streaming
(held) (release) (forward)
```
Three flags:
- `is_ready` (mechanical): every input partition has produced its first poll
result.
- `streaming_enabled` (rule-controllable proposal): reset each poll cycle by
RTO — set to `is_ready` as the permissive default. Rules can flip to false to
veto.
- `streaming_started` (actual emission control): only RTO flips this via
`start_streaming()`, which also wakes per-partition wakers.
### `RuntimeOptimizerExec`
Inserted at the plan root by a new `InsertRuntimeOptimizer` physical
optimizer rule. On every `poll_next`, runs a three-phase coordinator cycle:
1. **Propose**: walk the subtree; set `streaming_enabled = is_ready` on each
buffer (permissive default).
2. **Evaluate**: run each registered `RuntimeRule`. Rules may flip
`streaming_enabled = false` (veto), mutate adaptive operators in place, or both.
3. **Commit**: walk the subtree; for each buffer still enabled, call
`start_streaming()`.
A shared `Arc<AtomicWaker>` is threaded into both RTO and every buffer.
Buffers wake it when their `is_ready` flips; RTO registers `cx.waker()` on it
before each walk. The `AtomicWaker` is essential because `cx.waker()` is
task-local — a buffer's poll inside a spawned subtask (e.g. one of
`RepartitionExec`'s internals) cannot wake RTO via `cx.waker()`, but can via
the shared `AtomicWaker`.
### `RuntimeRule` trait
```rust
pub trait RuntimeRule: Send + Sync + std::fmt::Debug {
fn name(&self) -> &str;
fn evaluate(&self, plan: &Arc<dyn ExecutionPlan>);
}
```
Cheap, idempotent visitors. Rules track their own \"already fired\" state.
They observe runtime stats via new trait methods like `runtime_row_count` (this
PR) and (future) `runtime_partition_extrema` (analogous to
apache/datafusion#23090), and mutate adaptive operators via typed methods
(`HashJoinExec::flip_sides`, `RepartitionExec::set_split_points`, etc. — all
future work).
### `ExecutionPlan::runtime_row_count(partition)`
New trait method (default `None`) returning the number of rows an operator
*will emit* on a given partition. For pipeline-breaking operators, the value is
knowable the moment their build phase completes — *before* any output batch is
pulled. Implemented on `AggregateExec` here, reading from the existing
`OutputRows` metric.
### Example rule: `SwapBuildSideIfInverted`
First concrete `RuntimeRule`. Walks the plan looking for `HashJoinExec`. For
each, compares its left (build) and right (probe) sides' row counts: static
`statistics()` when Exact (e.g. an in-memory dimension table), runtime via
`runtime_row_count` summed across partitions otherwise (e.g. the output of a
`GROUP BY` whose distinct cardinality the planner couldn't predict).
If the build side is larger than the probe side, the rule would call
`HashJoinExec::flip_sides()`. **But that method doesn't exist yet**, so this PR
logs intent only (`RUST_LOG=info` visible). The detection mechanism is fully
wired end-to-end; the actually-saving-work part needs HashJoin to defer its
build phase until both sides' pipeline-breakers report `is_ready` — a follow-up
operator change.
## SLT walkthrough
`runtime_optimizer.slt` sets up the canonical misestimation case: `big`
(100K rows) joined to `small` (100 rows) through a GROUP BY on a
low-cardinality column the planner can't propagate distinct stats through.
JoinSelection trusts the Inexact ~100K estimate for aggregated_big and picks
small as the build side. At runtime aggregated_big is only 5 distinct groups;
the rule detects the inversion and would flip.
The EXPLAIN block in the SLT shows the static plan with the AQE primitives
wrapped — `RuntimeOptimizerExec` at root, `PipelineBreakerBuffer` above each
AggregateExec. The build-side swap is *not* reflected in EXPLAIN because the
proposed runtime mutation cannot retroactively appear in static plan output.
## What lands here vs. what's deferred
Lands:
- `PipelineBreakerBuffer` operator + `is_pipeline_breaker` heuristic +
insertion rule.
- `RuntimeOptimizerExec` operator + insertion rule + shared AtomicWaker wake
propagation.
- `RuntimeRule` trait + three-phase coordinator (propose / evaluate /
commit).
- `ExecutionPlan::runtime_row_count` trait method.
- `AggregateExec` impl of `runtime_row_count`.
- `SwapBuildSideIfInverted` rule (log-only).
- SLT demonstrating the detection end-to-end.
Deferred to follow-ups (each its own PR-sized piece of work):
1. **`HashJoinExec::flip_sides()`** with interior mutability + a build-phase
deferral mechanism that waits for descendant `PipelineBreakerBuffer`s to be
ready before starting build. This is the change that turns
`SwapBuildSideIfInverted` from a logger into a real optimization.
2. **`RepartitionExec::set_split_points()`** — mutable
`Partitioning::Range(Option<Vec<SplitPoint>>)`. Pairs with
`runtime_partition_extrema` on SortExec to enable parallel cumulative-window
functions and parallel range repartitioning in general. Mutating
RepartitionExec is *timing-safe* (it has no synchronous build phase, just
routing), so this follow-up doesn't need operator deferral.
3. Additional rules: partition coalescing, skew splitting, adaptive
aggregation strategy, broadcast-join switching.
4. `runtime_partition_extrema` trait method on SortExec, parallel-window
operators (`CarryExec`, `HaloDropExec`) — there's already a parallel PoC at
apache/datafusion#23026 / #23090 / #23094 that overlaps significantly; this
work would either build on or replace those primitives.
## Discussion points I'd appreciate feedback on
- **Architectural model.** Is the propose/evaluate/commit cycle the right
shape, or should rules return decisions to RTO instead of mutating buffer state
directly?
- **Pipeline-breaker detection.** Currently a hardcoded match against
`AggregateExec` and `SortExec`. Should there be a trait method like
`is_pipeline_breaker()`?
- **Trait surface.** `runtime_row_count(partition: usize)` returns total per
partition. Is per-partition vs. cross-partition total the right primitive?
- **Wake propagation via AtomicWaker.** Is the shared waker pattern
acceptable, or is there a more idiomatic approach DataFusion already uses for
cross-task notification I'm missing?
- **Naming.** `RuntimeOptimizerExec` vs `AdaptiveExec` vs something else?
Open to bikeshedding.
## What this PoC is not
- Not a complete AQE implementation. The log-only rule means no actual work
is saved yet.
- Not a replacement for the existing parallel-window-poc work — that work
targets the same general space (runtime-informed plan adaptation) but via
different primitives (the `runtime_partition_extrema` API + new operators). The
two designs are compatible and could converge.
- Not optimized. The on-every-poll subtree walks are O(plan depth); fine for
the prototype, would want amortization in production.
Looking for: \"is this the right architectural direction for AQE in
DataFusion?\" before investing in any of the follow-up work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]