RyanJamesStewart opened a new pull request, #22205: URL: https://github.com/apache/datafusion/pull/22205
## Which issue does this PR close? Closes #22164. ## Rationale for this change When partial hash aggregation hits the memory limit and switches to early-emit mode (`EmitTo::First(n)`), three of the four `GroupColumn::take_n` implementations extract the first n elements with `drain(0..n).collect::<Vec<_>>()`: - `bytes.rs:383` (`self.offsets`) - `primitive.rs:270` (`self.group_values`) - `bytes_view.rs:366` (`self.views`) As the issue reporter noted: `drain` does not affect the Vec's capacity, so `self.offsets` / `self.group_values` / `self.views` keeps the pre-emit allocation with reduced length. The whole point of the OOM-triggered early emit is to release memory; with the current code the builder ends up holding the same buffer it had a moment before the emit, which defeats the signal. (The fourth implementation, `boolean.rs::take_n`, already does the right thing via swap-then-truncate over `BooleanBufferBuilder`; out of scope.) ## What changes are included in this PR? For each of the three Vec-based call sites, replace `drain(0..n).collect()` with `std::mem::take` + `Vec::split_off(n)`: ```rust // before let first_n = self.vec.drain(0..n).collect::<Vec<_>>(); // self.vec : length = len - n, capacity = pre-emit capacity (stranded) // first_n : freshly allocated, sized to n // after let mut first_n = std::mem::take(&mut self.vec); self.vec = first_n.split_off(n); // self.vec : new allocation sized to len - n (capacity reclaimed) // first_n : original allocation, length n, retains pre-emit capacity ``` `Vec::split_off(at)` keeps `[0, at)` in `self` with the original capacity unchanged and returns a new Vec for `[at, len)` sized to its actual length. Combined with `mem::take`, the *retained* side becomes the freshly-sized buffer and the *emitted* side owns the original allocation. That is the correct assignment for OOM-emit because: 1. The emitted Vec is consumed immediately by `ScalarBuffer::from(Vec<T>)`, which is zero-copy via `Vec::into_raw_parts`; the original buffer travels into the output `ArrayRef` and is freed when downstream consumers drop it. 2. The retained Vec is what the builder keeps accumulating into. A freshly-sized buffer is what the issue's memory-pressure signal is asking for. **Allocation accounting.** Same allocation count as before. `mem::take` swaps with `Vec::new()` (no allocation); `split_off` allocates exactly one Vec for the tail; the original `drain(..n).collect()` allocated exactly one Vec for the head. The fix changes which side gets the new allocation, not how many allocations happen. The win is correct capacity assignment, not reduced allocation work. **Bonus on `bytes.rs`.** The drain-then-collect is immediately followed by `first_n_offsets.push(offset_n)` to close the offset range. With `collect()`, `first_n_offsets` has capacity exactly `n`, so the push triggered a reallocation. With `split_off`, `first_n_offsets` carries the pre-emit capacity, and the push fits without reallocating. Diff is 16 insertions, 3 deletions across the three files. ## Are these changes tested? Yes, by existing coverage: - `test_byte_take_n` (bytes.rs) - `test_byte_view_take_n`, `test_byte_view_take_n_partial_completed_nonzero_index` (bytes_view.rs) - `test_emit_first_n_for_vectorized_group_values`, `test_hashtable_modifying_in_emit_first_n` exercise the partial-aggregation emit path end-to-end through the `GroupColumn` trait, covering `primitive.rs`'s implementation. `cargo test -p datafusion-physical-plan --lib aggregates::group_values` passes (26 tests, 0 failed). `cargo clippy -p datafusion-physical-plan --lib -- -D warnings` is clean. No new test added: the existing tests pin the take_n behavior (output array correctness and remaining-side state), and the fix is behaviorally identical at that level. The change is in which Vec each side ends up holding. The only observable difference is heap reservation on the retained side, which is what the issue reports; `Vec::capacity()` is implementation-defined and asserting on it from a test would be flaky under allocator changes, so I have not added one. Happy to add a capacity assertion if you'd prefer it; flag and I'll push. ## Are there any user-facing changes? No. No API changes, no behavior change at the operator level. Purely an internal memory-pressure improvement on the partial-aggregation early-emit path. --- AI-assisted. The fix shape (mem::take + split_off direction) and the implementation are mine; I used Claude to help survey the three call sites and check that I had not missed a `take_n` impl elsewhere in `group_values/`. The substantive comprehension steps I went through: - Verified the issue reporter's mechanism description against each of the three sites; the drain-capacity gap is real and the sites share the same shape. - Traced the emitted Vec's lifetime through `ScalarBuffer::from(Vec<T>) -> Buffer::from(Vec<T>) -> Vec::into_raw_parts`, which is what justifies routing the original allocation to the emitted side. - Verified the allocation count is unchanged so the PR does not make a perf claim that would not hold up under `run benchmarks` on a non-OOM workload. The improvement is qualitative (memory-pressure semantics), not throughput. - Confirmed `boolean.rs::take_n` is already capacity-correct (different primitive, `BooleanBufferBuilder` swap-then-truncate) so the PR is correctly scoped to the three Vec-based sites. Named unknown: I have not measured the heap-reservation improvement under an actual OOM-bound workload (e.g. `clickbench_partitioned` with a memory limit). The fix is justified by the mechanism analytically; an empirical peak-RSS confirmation belongs in a follow-up if you want the perf-evidence side strengthened. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
