Samyak2 opened a new issue, #22757:
URL: https://github.com/apache/datafusion/issues/22757
### Describe the bug
We have seen this internally with `HashJoinExec` and `RepartitionExec`. The
repartition case is a bit hard to reproduce, since it depends on how many
batches are being buffered. So I will be using `HashJoinExec` as the example.
I'm suspecting any operator that holds a sequence of `RecordBatch`es in memory
has this problem.
Consider a tree like this:
```
HashJoinExec
AggregateExec: mode=Final
CoalescePartitionsExec
AggregateExec: mode=Partial
...
... (probe side)
```
- This is `AggregateExec` on the build-side of a hash join.
- The hash join buffers all the batches from agg in a `Vec<RecordBatch>` in
memory.
[Ref](https://github.com/apache/datafusion/blob/f0331999b929a7a2b0baf545bde12747ad042304/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1933).
- The memory accounting is done by adding `get_record_batch_memory_size` of
each batch separately.
- This helper de-duplicates the same buffer being referenced multiple
times within the same record batch.
- Now see how `AggregateExec` produces output:
[ref](https://github.com/apache/datafusion/blob/f0331999b929a7a2b0baf545bde12747ad042304/datafusion/physical-plan/src/aggregates/row_hash.rs#L875-L880)
- The agg itself produces a huge record batch, which is then sliced into
batches of 8192 (default batch_size) rows and sent downstream.
- All of these slices reference the same underlying huge buffer!
- So even if join is de-duplicating buffers within a record batch, it's
still counting the same buffer for every emitted batch.
- The memory is being overcounted by `num_output_rows / batch_size` times!
This can be a huge multiplier for large aggs.
### To Reproduce
I have a reproducer test here: https://github.com/Samyak2/datafusion/pull/2
`peak_mem_used` for final aggregate is ~104MB in the test. But hash join
fails even with 3GB memory limit!
We can see the test failing even with memory limit set to 30x (!) of the
aggregate peak mem. I can understand it being 2x of that - the memory counting
might duplicate once in join and once in agg, but 30x does not make sense.
Ideally, this should be close to 1x since we're not really allocating more
memory.
### Expected behavior
The query in the above test should pass with at most 2x the size of the
aggregate peak mem.
### Additional context
This is not just hash join, any operator that can buffer data coming from an
agg will show this behavior:
- `HashJoinExec`: buffers all batches coming on the build side
- `NestedLoopJoinExec`
- `CrossJoinExec`
- `SortMergeJoinExec`: although it won't buffer many records
- `RepartitionExec`: can buffer an unbounded number of batches in some cases
- `SortExec`: buffers batches in-mem
- `TopK`: buffers heap size number of batches
- `SortPreservingMergeExec`
There may be more I'm missing.
For the fix, I'm proposing a new helper alongside
`get_record_batch_memory_size` that is stateful. It keeps track of buffer
pointers that were previously seen (in either a HashSet or a HashMap) and
de-duplicates across batches using the state. We have been using a version of
this successfully internally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]