Dandandan opened a new pull request, #21729: URL: https://github.com/apache/datafusion/pull/21729
## Summary Wrap `AggregateExec`'s output stream with a small `pin_stream_to_thread` helper that drives the inner stream on a dedicated `spawn_blocking` thread hosting a `current_thread` tokio runtime. Batches flow back to the caller through a bounded `mpsc`; the caller still sees a normal `SendableRecordBatchStream`. Applied to every `AggregateMode` (Partial, PartialReduce, Final, FinalPartitioned, Single, SinglePartitioned) since they all hold the same kind of persistent grouping state. ## Motivation On the default multi-threaded tokio runtime, a `RepartitionExec`-fed aggregation migrates **constantly** between worker threads — every channel `.await` is a scheduling point and tokio's work-stealer happily moves the task to an idle worker. That bounces the partial-agg's hash map + accumulators across CPU caches on every poll. ### Measurement Added (but `#[ignore]`d) `aggregates::tests::measure_partial_agg_thread_migration`. 16 input partitions, 4 MT workers, input = `TestMemoryExec → RepartitionExec(Hash, 16) → PartialAgg`. A `ThreadProbe` wraps the raw partial-agg output stream (*inner*) and the forwarding stream returned by `pin_stream_to_thread` (*outer*), recording `thread::current().id()` at every `poll_next`. | | inner (partial-agg build) | outer (mpsc receiver) | |---|---|---| | **before this change** | 16/16 partitions migrated; 254 switches across 376 polls (**~68%**) | — | | **after this change** | **0/16 partitions migrated; 0 switches across 2585 polls** | 7/16 partitions migrated, 7 switches across 43 polls | The outer (driver-side) task still migrates — that's fine, it only carries the mpsc receive. The heavy hash-map state is now pinned for the partition's lifetime. Run with: `cargo test -p datafusion-physical-plan --lib aggregates::tests::measure_partial_agg_thread_migration -- --ignored --nocapture` ## Tradeoffs - **Gain**: aggregation state is cache-sticky; no migrations mid-build. - **Loss of work-stealing for the agg task**: a pinned aggregation no longer participates in MT work-stealing. For balanced partition counts (≈ CPU count, the common DataFusion case) the cache-locality win dominates. For very skewed partitions the long-tail partition can't be helped by idle threads mid-flight. - **Blocking-pool pressure**: each pinned partition holds one thread from the blocking pool (default 512) for the partition's lifetime. Heavy concurrency × high partition counts could saturate it; a dedicated thread-per-core executor is a cleaner answer for that regime but is out of scope for this PR. ## What's *not* in scope ClickBench's 43 queries also exercise `SortExec` / `TopKExec` on the ~10 `ORDER BY ... LIMIT N` queries — same stateful-build-over-many-awaits shape, same potential win. Happy to do a follow-up PR if the idea is welcome; keeping this one focused on aggregates. ## Test plan - [x] `cargo check -p datafusion-physical-plan` - [x] `cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings` - [x] `cargo fmt --all -- --check` - [x] Local probe shows inner migrations drop from 254 → 0. - [ ] ClickBench wall-clock before/after (happy to run and post numbers if reviewers want them). 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
