milenkovicm opened a new issue, #7858:
URL: https://github.com/apache/arrow-datafusion/issues/7858
### Is your feature request related to a problem or challenge?
I'd like to share few observations after putting `hash aggregation` on a
stress test. Datafusion v32 used.
Before I start, I apologise if I did not get something correctly,
aggregation code has changed a lot since last time I had a look.
I've created a test case which should put under pressure hash aggregation,
forcing it to spill, so I can observe query behaviour. Data contains about 250M
rows, with 50M unique `uid`s, which are going to be used to as aggregation key.
It is around 3GB parquet file.
Memory pool is configured as follows:
```rust
let runtime_config = RuntimeConfig::new()
.with_memory_pool(Arc::new(FairSpillPool::new(2_000_000_000)))
```
with 4 target partitions.
Query is rather simple:
```sql
select count(*), sum(id), sum(co), sum(n), uid from ta group by uid
```
I was expecting that query will not fail with `ResourcesExhausted` as
aggregation would spill if under memory pressure, this was not the case.
### Describe the solution you'd like
Few low hanging fruits which can be addressed:
1. `sort_batch` copies data, if I'm not mistaken. As spill is usually
triggered under memory pressure, in most cases for all partitions around same
time, it effectively doubles memory needed (in most cases I've observed ~2.5x
more memory used than set up for memory pool).
https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L672
2. Spill writes whole state as single batch, which is problem later when we
try to merge all those files,
https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L676
3. Not sure whats the reason for checks at:
https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L591
and
https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L699
Available improvements, IMHO:
1. before spill, we could split batch into smaller blocks, sort those
smaller blocks and write spill file per block, at the moment we write single
file. Not sure what would be strategy for splitting batch into smaller blocks,
we should take into account not to have to many open files as well.
2. Write more than one batch per spill
```rust
fn spill(&mut self) -> Result<()> {
let emit = self.emit(EmitTo::All, true)?;
let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
let spillfile =
self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
// TODO: slice large `sorted` and write to multiple files in parallel
let mut offset = 0;
let total_rows = sorted.num_rows();
while offset < total_rows {
let length = std::cmp::min(total_rows - offset, self.batch_size);
let batch = sorted.slice(offset, length);
offset += batch.num_rows();
writer.write(&batch)?;
}
writer.finish()?;
self.spill_state.spills.push(spillfile);
Ok(())
}
```
3. can we remove `if` at ?
https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L591
and change `if` at
https://github.com/apache/arrow-datafusion/blob/7acd8833cc5d03ba7643d4ae424553c7681ccce8/datafusion/physical-plan/src/aggregates/row_hash.rs#L699
to `self.group_values.len() > 0`
it would make more sense to "send" smaller batch than fail with
`ResourcesExhausted`
### Describe alternatives you've considered
No other alternatives considered at the moment
### Additional context
I have disabled (commented out) resource accounting in `RepartitionExec` as
it would be the first one to freak out with `ResourcesExhausted`. From what I
observed, `RepartitionExec` would hold memory for a few batches of data when it
raises `ResourcesExhausted`. Change made in #4816 make sense, but in my test
they were the first one to give up, before aggregation spill can occur.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]