ahmed-mez opened a new issue, #19906:
URL: https://github.com/apache/datafusion/issues/19906
### Is your feature request related to a problem or challenge?
When `GroupedHashAggregateStream` finishes processing input and transitions
to emitting accumulated groups, it calls `emit(EmitTo::All)` which materializes
all groups at once. For high-cardinality aggregations (>500k groups) or complex
grouping keys (Strings, Lists), this becomes a CPU-intensive blocking operation
that stalls the async runtime for hundreds of milliseconds to seconds.
This "long poll" prevents other tasks on the same thread from running,
causing latency spikes and system "hiccups." We've observed `AggregateExec`
stalling the runtime for >1s when processing queries with ~10M groups.
**Prior attempts to fix this**
In #18906, we introduced a `DrainingGroups` state to emit groups
incrementally. In #19562, we implemented `EmitTo::Next(n)` for true incremental
emission at the `GroupColumn` level. This works correctly but revealed a ~15x
performance regression on high-cardinality queries.
**Root cause**
The current `GroupColumn` implementations store values in contiguous
`Vec<T>`. When emitting first n elements via `take_n()`, all remaining elements
must be shifted:
```rust
fn take_n(&mut self, n: usize) -> ArrayRef {
let first_n = self.group_values.drain(0..n).collect(); // O(remaining)!
}
```
Profiling showed costs distributed across `Vec::from_iter` (allocations),
`MaybeNullBufferBuilder::take_n` (copying), and `_platform_memmove` (shifting).
**Conclusion**: Incremental emission is the right approach, but requires
chunked storage to be efficient.
### Describe the solution you'd like
This epic tracks two complementary changes:
**1. Chunked Storage for GroupColumn**
Replace contiguous `Vec<T>` with a chunked structure so `take_n()` is O(n)
instead of O(remaining):
```
Current: Vec [v0, v1, v2, ...vN] → take_n(3) shifts all remaining =
O(remaining)
Chunked: [Chunk0] → [Chunk1] → [Chunk2] → take_n(3) advances head = O(n)
```
**2. Incremental Emission in `HashAggregate`**
Update `GroupedHashAggregateStream` to call `emit(EmitTo::Next(batch_size))`
iteratively instead of `emit(EmitTo::All)`, yielding between emissions.
**Implementation phases:**
**1. Infrastructure**
- [ ] Add `ChunkedVec<T>` data structure with `push`, `len`, `get`,
`take_n`, `iter`, and `size` methods
- [ ] Add `ChunkedNullBufferBuilder` for chunked null bitmap storage with
`take_n` support
**2. `GroupColumn` Migrations**
- [ ] Migrate `PrimitiveGroupValueBuilder` to use `ChunkedVec` for group
values storage
- [ ] Migrate `BooleanGroupValueBuilder` to use chunked storage
- [ ] Migrate `ByteGroupValueBuilder` to use chunked offsets and buffer
- [ ] Migrate `ByteViewGroupValueBuilder` to use chunked views storage
**3. Emission Path**
- [ ] Optimize `emit()` index adjustment for `EmitTo::First(n)` with chunked
storage
- [ ] Update `GroupedHashAggregateStream` to use
`emit(EmitTo::Next(batch_size))` iteratively in `ProducingOutput` state
### Describe alternatives you've considered
Increase `target_partitions` which reduces groups per partition and could
mitigate the issue. Can work but doesn't solve the fundamental problem and has
other trade-offs (e.g. working on smaller partitions can slow down other
operators).
### Additional context
Related issues & experiments
- #18907
- #18906
- #19562
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]