alamb commented on issue #9417:
URL: 
https://github.com/apache/arrow-datafusion/issues/9417#issuecomment-1987187927

   I agree with @Dandandan  in 
https://github.com/apache/arrow-datafusion/issues/9417#issuecomment-1975197127 
that the core problem is with accounting
   
   1. The `AggregateExec` generates one single (giant) `RecordBatch` on output 
([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L660))
   2. Which is then emitted in parts (via `RecordBatch::slice()`, which does 
not actually allocate any additional memory) 
([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L492-L497))
 -- note this means no memory is freed until the GroupByHash has output all the 
output
   3. The TopK operator, however, then treats each incoming `RecordBatch` as 
though it were were an additional allocation that needs to be tracked 
([source](https://github.com/apache/arrow-datafusion/blob/e642cc2a94f38518d765d25c8113523aedc29198/datafusion/physical-plan/src/topk/mod.rs#L576))
   
   If we had infinite time / engineering hours I think a better approach would 
actually be to change GroupByHash so it didn't create a single giant contiguous 
`RecordBatch`
   
   Instead it would be better if GroupByHash produced a `Vec<RecordBatch>` and 
then incrementally fed those batches out
   
   Doing this would allow the GroupByHash to release memory incrementally as it 
output. This is analogous to how @korowa  made join output incremental in 
https://github.com/apache/arrow-datafusion/pull/8658


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to