alamb opened a new issue, #9562: URL: https://github.com/apache/arrow-datafusion/issues/9562
### Is your feature request related to a problem or challenge? 1. The `AggregateExec` generates one single (giant) `RecordBatch` on output ([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L660)) 2. Which is then emitted in parts (via `RecordBatch::slice()`, which does not actually allocate any additional memory) ([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L492-L497)) This has at least two potential downsides: 1. No memory is freed until the GroupByHash has output every output row 2. As we see in https://github.com/apache/arrow-datafusion/issues/9417, if there are upstream operators like TopK that hold references to any of these sliced `RecordBatch`s, those slices are treated as though they were an additional allocation that needs to be tracked ([source](https://github.com/apache/arrow-datafusion/blob/e642cc2a94f38518d765d25c8113523aedc29198/datafusion/physical-plan/src/topk/mod.rs#L576)) Something like this in pictures: ``` Output ▲ RecordBatches are │ slices into a ┌────────────────┐ single large │ RecordBatch │─ ─ ─ ┐ output batch └────────────────┴ ─ ─ ┐ ▲ │ │ │ ┌────────────────┐ ┌────────────────┐ └ ─ ─ ─▶│ │ output stream │ RecordBatch │ │ ├ ─ ─ ─ ─ ─ ─ ─ ─│ └────────────────┘ ─ ─ ─ ─▶│ │ ├ ─ ─ ─ ─ ─ ─ ─ ─│ ... │ │ │ │ ▲ │ ... │ │ │ │ ┌────────────────┐ │ │ │ RecordBatch ├ ─ ─ ┐ │ │ └────────────────┘ │ │ ▲ │ ├ ─ ─ ─ ─ ─ ─ ─ ─│ │ ─ ─ ─ ─▶│ │ │ └────────────────┘ │ ┏━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃ Single RecordBatch ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ GroupByHashExec ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━┛ ``` ### Describe the solution you'd like If we had infinite time / engineering hours I think a better approach would actually be to change GroupByHash so it didn't create a single giant contiguous `RecordBatch` Instead it would be better if GroupByHash produced a `Vec<RecordBatch>` and then incrementally fed those batches out Doing this would allow the GroupByHash to release memory incrementally as it output. This is analogous to how @korowa made join output incremental in https://github.com/apache/arrow-datafusion/pull/8658 Perhaps something like ``` ▲ │ ┌────────────────┐ Output output stream │ RecordBatch │ RecordBatches are └────────────────┘ created in smaller ▲ chunks and emitted │ one by one │ │ ┏━━━━━━━━━━━━━━━━━━━━━━━┓ ┌────────────────┐ ┃ ┃ │ RecordBatch │ ┃ ┃ └────────────────┘ ┃ ┃ ┌────────────────┐ ┃ ┃ │ RecordBatch │ ┃ ┃ └────────────────┘ ┃ GroupByHashExec ┃ ┌────────────────┐ ┃ ┃ │ RecordBatch │ ┃ ┃ └────────────────┘ ┃ ┃ ... ┃ ┃ ┌────────────────┐ ┃ ┃ │ RecordBatch │ ┃ ┃ └────────────────┘ ┗━━━━━━━━━━━━━━━━━━━━━━━┛ Vec<RecordBatch> ``` ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
