Samyak2 opened a new issue, #22757:
URL: https://github.com/apache/datafusion/issues/22757

   ### Describe the bug
   
   We have seen this internally with `HashJoinExec` and `RepartitionExec`. The 
repartition case is a bit hard to reproduce, since it depends on how many 
batches are being buffered. So I will be using `HashJoinExec` as the example. 
I'm suspecting any operator that holds a sequence of `RecordBatch`es in memory 
has this problem.
   
   Consider a tree like this:
   ```
   HashJoinExec
     AggregateExec: mode=Final
       CoalescePartitionsExec
         AggregateExec: mode=Partial
           ...
     ... (probe side)
   ```
   
   - This is `AggregateExec` on the build-side of a hash join.
   - The hash join buffers all the batches from agg in a `Vec<RecordBatch>` in 
memory. 
[Ref](https://github.com/apache/datafusion/blob/f0331999b929a7a2b0baf545bde12747ad042304/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1933).
   - The memory accounting is done by adding `get_record_batch_memory_size` of 
each batch separately.
     - This helper de-duplicates the same buffer being referenced multiple 
times within the same record batch.
   - Now see how `AggregateExec` produces output: 
[ref](https://github.com/apache/datafusion/blob/f0331999b929a7a2b0baf545bde12747ad042304/datafusion/physical-plan/src/aggregates/row_hash.rs#L875-L880)
     - The agg itself produces a huge record batch, which is then sliced into 
batches of 8192 (default batch_size) rows and sent downstream.
     - All of these slices reference the same underlying huge buffer!
     - So even if join is de-duplicating buffers within a record batch, it's 
still counting the same buffer for every emitted batch.
     - The memory is being overcounted by `num_output_rows / batch_size` times! 
This can be a huge multiplier for large aggs.
   
   ### To Reproduce
   
   I have a reproducer test here: https://github.com/Samyak2/datafusion/pull/2
   
   `peak_mem_used` for final aggregate is ~104MB in the test. But hash join 
fails even with 3GB memory limit!
   
   We can see the test failing even with memory limit set to 30x (!) of the 
aggregate peak mem. I can understand it being 2x of that - the memory counting 
might duplicate once in join and once in agg, but 30x does not make sense. 
Ideally, this should be close to 1x since we're not really allocating more 
memory.
   
   ### Expected behavior
   
   The query in the above test should pass with at most 2x the size of the 
aggregate peak mem.
   
   ### Additional context
   
   This is not just hash join, any operator that can buffer data coming from an 
agg will show this behavior:
   - `HashJoinExec`: buffers all batches coming on the build side
   - `NestedLoopJoinExec`
   - `CrossJoinExec`
   - `SortMergeJoinExec`: although it won't buffer many records
   - `RepartitionExec`: can buffer an unbounded number of batches in some cases
   - `SortExec`: buffers batches in-mem
   - `TopK`: buffers heap size number of batches
   - `SortPreservingMergeExec`
   
   There may be more I'm missing.
   
   For the fix, I'm proposing a new helper alongside 
`get_record_batch_memory_size` that is stateful. It keeps track of buffer 
pointers that were previously seen (in either a HashSet or a HashMap) and 
de-duplicates across batches using the state. We have been using a version of 
this successfully internally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to