alamb opened a new issue, #9562:
URL: https://github.com/apache/arrow-datafusion/issues/9562

   ### Is your feature request related to a problem or challenge?
   
   1. The `AggregateExec` generates one single (giant) `RecordBatch` on output 
([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L660))
   2. Which is then emitted in parts (via `RecordBatch::slice()`, which does 
not actually allocate any additional memory) 
([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L492-L497))
 
   
   This has at least two potential downsides:
   1. No memory is freed until the GroupByHash has output every output row
   2. As we see in https://github.com/apache/arrow-datafusion/issues/9417, if 
there are upstream operators like TopK that hold references to any of these 
sliced `RecordBatch`s, those slices  are treated as though they were an 
additional allocation that needs to be tracked 
([source](https://github.com/apache/arrow-datafusion/blob/e642cc2a94f38518d765d25c8113523aedc29198/datafusion/physical-plan/src/topk/mod.rs#L576))
   
   Something like this in pictures:
   
   ```
                                                    Output             
                              ▲               RecordBatches are        
                              │                 slices into a          
                     ┌────────────────┐          single large          
                     │  RecordBatch   │─ ─ ─ ┐   output batch          
                     └────────────────┴ ─ ─ ┐                          
                              ▲              │                         
                              │             │        ┌────────────────┐
                     ┌────────────────┐      └ ─ ─ ─▶│                │
     output stream   │  RecordBatch   │     │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                     └────────────────┘      ─ ─ ─ ─▶│                │
                                                     ├ ─ ─ ─ ─ ─ ─ ─ ─│
                            ...                      │                │
                                                     │                │
                              ▲                      │      ...       │
                              │                      │                │
                     ┌────────────────┐              │                │
                     │  RecordBatch   ├ ─ ─ ┐        │                │
                     └────────────────┘              │                │
                              ▲             │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                              │              ─ ─ ─ ─▶│                │
                              │                      └────────────────┘
                              │                                        
                  ┏━━━━━━━━━━━━━━━━━━━━━━━┓                            
                  ┃                       ┃       Single RecordBatch   
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃    GroupByHashExec    ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            
   ```
   
   ### Describe the solution you'd like
   
   If we had infinite time / engineering hours I think a better approach would 
actually be to change GroupByHash so it didn't create a single giant contiguous 
`RecordBatch`
   
   Instead it would be better if GroupByHash produced a `Vec<RecordBatch>` and 
then incrementally fed those batches out
   
   Doing this would allow the GroupByHash to release memory incrementally as it 
output. This is analogous to how @korowa  made join output incremental in 
https://github.com/apache/arrow-datafusion/pull/8658
   
   Perhaps something like
   
   ```
                               ▲                                        
                               │                                        
                      ┌────────────────┐                    Output      
     output stream    │  RecordBatch   │              RecordBatches are 
                      └────────────────┘              created in smaller
                               ▲                      chunks and emitted
                               │                          one by one    
                               │                                        
                               │                                        
                   ┏━━━━━━━━━━━━━━━━━━━━━━━┓          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┃                       ┃          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┃    GroupByHashExec    ┃          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┃                       ┃                 ...        
                   ┃                       ┃          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            
                                                       Vec<RecordBatch> 
                                                                        
                                                                        
   ```
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to