Ablu opened a new issue, #11390: URL: https://github.com/apache/datafusion/issues/11390
### Describe the bug When using a grouping (I tested with `distinct_on`) in combination with a `FairSpillPool` not all memory seems to be tagged correctly. While the grouping itself does gracefully handle allocation errors by spilling, the same reservation is also shared with a `BatchBuilder` that does not. ### To Reproduce 1. Clone my reproducer: https://github.com/Ablu/datafusion-repro-resource-allocation (chances are that this can be simplified further) 2. Generate test file: `cargo run --release -- generate out.parquet` 3. increase open file limit: `ulimit -nS 102400` 4. Attempt to run the deduplication query: `cargo run --release -- deduplicate out.parquet dedup.parquet` ``` Failure to write back final result: ResourcesExhausted("Failed to allocate additional 8939616 bytes for GroupedHashAggregateStream[10] with 58373092 bytes already allocated - maximum available is 62500000") note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace ``` It looks like it tries to acquire from the smaller "spillable" part of the memory while it should probably allocate with a non-spillable reservation. As a hack I remove the offending resource allocations: ``` diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index d32c60697..d0305c0bf 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -65,15 +65,15 @@ impl BatchBuilder { indices: Vec::with_capacity(batch_size), reservation, } } /// Append a new batch in `stream_idx` pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { - self.reservation.try_grow(batch.get_array_memory_size())?; + // dbg!(self.reservation.try_grow(batch.get_array_memory_size()))?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { batch_idx, row_idx: 0, }; Ok(()) @@ -137,15 +137,15 @@ impl BatchBuilder { let retain = stream_cursor.batch_idx == batch_idx; batch_idx += 1; if retain { stream_cursor.batch_idx = retained; retained += 1; } else { - self.reservation.shrink(batch.get_array_memory_size()); + // self.reservation.shrink(batch.get_array_memory_size()); } retain }); Ok(Some(RecordBatch::try_new( Arc::clone(&self.schema), columns, ``` That avoids the immediate error. It looks like we progress further, but eventually end in some kind of deadlock (that I have not fully understand yet, but it does not seem to be related to this hack?) ### Expected behavior The query + writeback should pass after reading back spilled data. ### Additional context Originally discussed/analyzed on Discord: https://discord.com/channels/885562378132000778/1166447479609376850/1260302583637999756 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
