Ablu opened a new issue, #11390:
URL: https://github.com/apache/datafusion/issues/11390

   ### Describe the bug
   
   When using a grouping (I tested with `distinct_on`) in combination with a 
`FairSpillPool` not all memory seems to be tagged correctly. While the grouping 
itself does gracefully handle allocation errors by spilling, the same 
reservation is also shared with a `BatchBuilder` that does not.
   
   ### To Reproduce
   
   1. Clone my reproducer: 
https://github.com/Ablu/datafusion-repro-resource-allocation (chances are that 
this can be simplified further)
   2. Generate test file: `cargo run --release -- generate out.parquet`
   3. increase open file limit: `ulimit -nS 102400`
   4. Attempt to run the deduplication query: `cargo run --release -- 
deduplicate out.parquet dedup.parquet`
   
   ```
   Failure to write back final result: ResourcesExhausted("Failed to allocate 
additional 8939616 bytes for GroupedHashAggregateStream[10] with 58373092 bytes 
already allocated - maximum available is 62500000")
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   ```
   
   It looks like it tries to acquire from the smaller "spillable" part of the 
memory while it should probably allocate with a non-spillable reservation.
   
   As a hack I remove the offending resource allocations:
   ```
   diff --git a/datafusion/physical-plan/src/sorts/builder.rs 
b/datafusion/physical-plan/src/sorts/builder.rs
   index d32c60697..d0305c0bf 100644
   --- a/datafusion/physical-plan/src/sorts/builder.rs
   +++ b/datafusion/physical-plan/src/sorts/builder.rs
   @@ -65,15 +65,15 @@ impl BatchBuilder {
                indices: Vec::with_capacity(batch_size),
                reservation,
            }
        }
    
        /// Append a new batch in `stream_idx`
        pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
   -        self.reservation.try_grow(batch.get_array_memory_size())?;
   +        // dbg!(self.reservation.try_grow(batch.get_array_memory_size()))?;
            let batch_idx = self.batches.len();
            self.batches.push((stream_idx, batch));
            self.cursors[stream_idx] = BatchCursor {
                batch_idx,
                row_idx: 0,
            };
            Ok(())
   @@ -137,15 +137,15 @@ impl BatchBuilder {
                let retain = stream_cursor.batch_idx == batch_idx;
                batch_idx += 1;
    
                if retain {
                    stream_cursor.batch_idx = retained;
                    retained += 1;
                } else {
   -                self.reservation.shrink(batch.get_array_memory_size());
   +                // self.reservation.shrink(batch.get_array_memory_size());
                }
                retain
            });
    
            Ok(Some(RecordBatch::try_new(
                Arc::clone(&self.schema),
                columns,
   ```
   
   That avoids the immediate error. It looks like we progress further, but 
eventually end in some kind of deadlock (that I have not fully understand yet, 
but it does not seem to be related to this hack?)
   
   ### Expected behavior
   
   The query + writeback should pass after reading back spilled data.
   
   ### Additional context
   
   Originally discussed/analyzed on Discord: 
https://discord.com/channels/885562378132000778/1166447479609376850/1260302583637999756


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to