andygrove commented on issue #2452:
URL:
https://github.com/apache/datafusion-comet/issues/2452#issuecomment-3361528202
@comphead @parthchandra Here is what I know so far (which is not very much.
I am just getting started with understanding DataFusion's code in this area).
`sort_and_spill_in_mem_batches` is being called, presumably as a result of
not being able to allocate extra memory.
`sort_and_spill_in_mem_batches` then has a loop which polls for batches and
then also tries to allocate memory but has fallbacks if it cannot get more
memory:
```rust
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
let sorted_size = get_reserved_byte_for_record_batch(&batch);
if self.reservation.try_grow(sorted_size).is_err() {
// Although the reservation is not enough, the batch is
// already in memory, so it's okay to combine it with
previously
// sorted batches, and spill together.
globally_sorted_batches.push(batch);
self.consume_and_spill_append(&mut globally_sorted_batches)
.await?; // reservation is freed in spill()
} else {
globally_sorted_batches.push(batch);
}
}
```
Unfortunately, the call to `sorted_stream.next().await` also allocates
memory, because it is calling `SortPreservingMergeStream` and its poll method
will call `BatchBuilder::push_batch`:
```rust
Poll::Ready(self.in_progress.push_batch(idx, batch))
```
`push_batch` fails to allocate memory here:
```rust
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) ->
Result<()> {
self.reservation
.try_grow(get_record_batch_memory_size(&batch))?;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]