ahmed-mez opened a new issue, #19906:
URL: https://github.com/apache/datafusion/issues/19906

   ### Is your feature request related to a problem or challenge?
   
   When `GroupedHashAggregateStream` finishes processing input and transitions 
to emitting accumulated groups, it calls `emit(EmitTo::All)` which materializes 
all groups at once. For high-cardinality aggregations (>500k groups) or complex 
grouping keys (Strings, Lists), this becomes a CPU-intensive blocking operation 
that stalls the async runtime for hundreds of milliseconds to seconds.
   
   This "long poll" prevents other tasks on the same thread from running, 
causing latency spikes and system "hiccups." We've observed `AggregateExec` 
stalling the runtime for >1s when processing queries with ~10M groups.
   
   **Prior attempts to fix this**
   In #18906, we introduced a `DrainingGroups` state to emit groups 
incrementally. In #19562, we implemented `EmitTo::Next(n)` for true incremental 
emission at the `GroupColumn` level. This works correctly but revealed a ~15x 
performance regression on high-cardinality queries.
   
   **Root cause**
   The current `GroupColumn` implementations store values in contiguous 
`Vec<T>`. When emitting first n elements via `take_n()`, all remaining elements 
must be shifted:
   ```rust
   fn take_n(&mut self, n: usize) -> ArrayRef {
       let first_n = self.group_values.drain(0..n).collect();  // O(remaining)!
   }
   ```
   
   Profiling showed costs distributed across `Vec::from_iter` (allocations), 
`MaybeNullBufferBuilder::take_n` (copying), and `_platform_memmove` (shifting).
   
   **Conclusion**: Incremental emission is the right approach, but requires 
chunked storage to be efficient.
   
   ### Describe the solution you'd like
   
   This epic tracks two complementary changes:
   
   **1. Chunked Storage for GroupColumn**
   Replace contiguous `Vec<T>` with a chunked structure so `take_n()` is O(n) 
instead of O(remaining):
   ```
   Current:  Vec [v0, v1, v2, ...vN]  →  take_n(3) shifts all remaining = 
O(remaining)
   Chunked:  [Chunk0] → [Chunk1] → [Chunk2]  →  take_n(3) advances head = O(n)
   ```
   **2. Incremental Emission in `HashAggregate`**
   Update `GroupedHashAggregateStream` to call `emit(EmitTo::Next(batch_size))` 
iteratively instead of `emit(EmitTo::All)`, yielding between emissions.
   
   **Implementation phases:**
   
   **1. Infrastructure**
   - [ ] Add `ChunkedVec<T>` data structure with `push`, `len`, `get`, 
`take_n`, `iter`, and `size` methods
   - [ ] Add `ChunkedNullBufferBuilder` for chunked null bitmap storage with 
`take_n` support
   
   **2. `GroupColumn` Migrations**
   - [ ] Migrate `PrimitiveGroupValueBuilder` to use `ChunkedVec` for group 
values storage
   - [ ] Migrate `BooleanGroupValueBuilder` to use chunked storage
   - [ ] Migrate `ByteGroupValueBuilder` to use chunked offsets and buffer
   - [ ] Migrate `ByteViewGroupValueBuilder` to use chunked views storage
   
   **3. Emission Path**
   - [ ] Optimize `emit()` index adjustment for `EmitTo::First(n)` with chunked 
storage
   - [ ] Update `GroupedHashAggregateStream` to use 
`emit(EmitTo::Next(batch_size))` iteratively in `ProducingOutput` state
   
   ### Describe alternatives you've considered
   
   Increase `target_partitions` which reduces groups per partition and could 
mitigate the issue. Can work but doesn't solve the fundamental problem and has 
other trade-offs (e.g. working on smaller partitions can slow down other 
operators).
   
   ### Additional context
   
   Related issues & experiments
   - #18907
   - #18906
   - #19562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to