wirybeaver opened a new issue, #22898:
URL: https://github.com/apache/datafusion/issues/22898

   ### Is your feature request related to a problem or challenge?
   
   DataFusion's memory accounting relies on operators manually computing 
`get_array_memory_size()` after allocation and calling `try_resize()` on a 
`MemoryReservation`. This has two correctness problems:
   
   1. **Under-accounting**: Every missed `try_grow` call site is a silent bug. 
Operators allocate Arrow buffers (via builders, compute kernels like 
`take()`/`interleave()`, `concat_batches()`) without reserving memory, causing 
actual usage to exceed the pool budget and leading to OOM kills.
   2. **Over-accounting**: Shared `Arc<Buffer>` ownership causes 
`get_array_memory_size()` to count the same physical buffer multiple times when 
multiple operators or slices hold references. This triggers premature spilling.
   
   These are documented in #20714, #16841, and #22526.
   
   ### Describe the solution you'd like
   
   I propose a **three-layer migration path** from manual tracking to 
automatic, pool-integrated memory accounting. Each layer is independently 
useful and builds on the previous:
   
   #### Layer 1: Manual `try_grow` (current approach, patched)
   
   Fix the under-accounting by adding `MemoryReservation` tracking at each 
untracked allocation site. This is a necessary first step but inherently 
fragile — every new allocation site must remember to call `try_grow`.
   
   **Prototype**: https://github.com/wirybeaver/datafusion/pull/1
   
   #### Layer 2: Post-construction `claim()` via `ArrowMemoryPool`
   
   Use Arrow's `buffer.claim(pool)` / `ArrayData::claim(pool)` API to register 
buffer ownership at output boundaries. Claims are **idempotent** — the same 
buffer claimed twice is counted once. This eliminates both under-accounting 
(can't miss sites if you claim at the output boundary) and over-accounting 
(shared buffers counted once).
   
   DataFusion already has the adapter: `ArrowMemoryPool` in 
`datafusion/execution/src/memory_pool/arrow.rs` wraps DataFusion's `MemoryPool` 
and implements `arrow_buffer::MemoryPool`. The `arrow-buffer/pool` feature is 
enabled.
   
   ```rust
   // BEFORE (manual, error-prone):
   fn emit(&mut self, ...) -> Result<Option<RecordBatch>> {
       let batch = RecordBatch::try_new(schema, output)?;
       let batch_size = batch.get_array_memory_size();  // over-counts shared 
buffers
       self.transient_reservation.grow(batch_size);
       Ok(Some(batch))
   }
   
   // AFTER (post-construction claim, idempotent):
   fn emit(&mut self, ...) -> Result<Option<RecordBatch>> {
       let batch = RecordBatch::try_new(schema, output)?;
       claim_batch(&batch, &self.arrow_pool);  // shared buffers counted once
       Ok(Some(batch))
   }
   ```
   
   **Limitation**: `claim()` is post-construction. A burst allocation can 
temporarily exceed pool limits before the claim catches it.
   
   **Prototype**: https://github.com/wirybeaver/datafusion/pull/3
   
   #### Layer 3: Builder `with_pool()` for real-time enforcement
   
   Add `with_pool()` to Arrow builders so every internal buffer growth goes 
through the pool's reservation system. If the pool is exhausted, the growth 
fails *before* the allocation happens.
   
   **Key finding**: Arrow builders (`PrimitiveBuilder`, `GenericByteBuilder`, 
`GenericListBuilder`) store values in `Vec<T::Native>`, not `MutableBuffer`. 
The `Vec` → `MutableBuffer` conversion only happens in `finish()`. This means 
`MutableBuffer`'s existing `#[cfg(feature = "pool")] reservation` field is 
bypassed during the entire build phase. A `TrackedVec<T>` wrapper is needed to 
bridge this gap.
   
   ```rust
   // Real-time enforcement during building:
   let mut builder = Int32Builder::new()
       .with_pool(&arrow_pool)?;
   
   for row in rows {
       match builder.try_append_value(row) {
           Ok(()) => {}
           Err(_oom) => {
               // Pool exhausted — spill what we have
               break;
           }
       }
   }
   let array = builder.finish();
   // Reservation transfers into the array's buffer; freed on drop.
   ```
   
   **Prototype** (upstream arrow-rs): 
https://github.com/wirybeaver/arrow-rs/pull/1
   
   ### Comparison
   
   | | Manual `try_grow` | Post-construction `claim()` | Builder `with_pool()` |
   |---|---|---|---|
   | Can miss sites | Yes | No (claim at output boundary) | No (tracked at 
alloc) |
   | Over-counts shared buffers | Yes | No (idempotent) | No (idempotent) |
   | Enforces limits *before* alloc | Yes (laggy) | No (brief burst possible) | 
Yes (immediate) |
   | Exists in Arrow today | N/A | Yes (arrow-buffer 58.3) | No (needs upstream 
PR) |
   | Change surface per operator | ~20 size-compute points | ~5 claim points | 
~0 (automatic) |
   
   ### Describe alternatives you've considered
   
   - **`get_slice_memory_size()`** instead of `get_array_memory_size()`: 
Reduces over-counting for sliced arrays but doesn't solve under-accounting and 
is still manual.
   - **`compact()` retained slices**: Eliminates shared buffer references by 
copying. Correct but adds CPU cost and memory churn.
   - **Session-level default pool**: All builders pick up a thread-local pool 
automatically. Eliminates per-builder `with_pool()` calls but adds implicit 
global state.
   
   ### Additional context
   
   - Related epic: #22758
   - Under-accounting issues: #22723, #22739, #20714
   - Over-accounting issues: #16841, #22526
   - Arrow `claim()` API: [arrow-rs PR 
#7303](https://github.com/apache/arrow-rs/pull/7303)
   - Arrow `ArrayData::claim()`: [arrow-rs PR 
#8918](https://github.com/apache/arrow-rs/pull/8918)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to