wirybeaver opened a new issue, #22898: URL: https://github.com/apache/datafusion/issues/22898
### Is your feature request related to a problem or challenge? DataFusion's memory accounting relies on operators manually computing `get_array_memory_size()` after allocation and calling `try_resize()` on a `MemoryReservation`. This has two correctness problems: 1. **Under-accounting**: Every missed `try_grow` call site is a silent bug. Operators allocate Arrow buffers (via builders, compute kernels like `take()`/`interleave()`, `concat_batches()`) without reserving memory, causing actual usage to exceed the pool budget and leading to OOM kills. 2. **Over-accounting**: Shared `Arc<Buffer>` ownership causes `get_array_memory_size()` to count the same physical buffer multiple times when multiple operators or slices hold references. This triggers premature spilling. These are documented in #20714, #16841, and #22526. ### Describe the solution you'd like I propose a **three-layer migration path** from manual tracking to automatic, pool-integrated memory accounting. Each layer is independently useful and builds on the previous: #### Layer 1: Manual `try_grow` (current approach, patched) Fix the under-accounting by adding `MemoryReservation` tracking at each untracked allocation site. This is a necessary first step but inherently fragile — every new allocation site must remember to call `try_grow`. **Prototype**: https://github.com/wirybeaver/datafusion/pull/1 #### Layer 2: Post-construction `claim()` via `ArrowMemoryPool` Use Arrow's `buffer.claim(pool)` / `ArrayData::claim(pool)` API to register buffer ownership at output boundaries. Claims are **idempotent** — the same buffer claimed twice is counted once. This eliminates both under-accounting (can't miss sites if you claim at the output boundary) and over-accounting (shared buffers counted once). DataFusion already has the adapter: `ArrowMemoryPool` in `datafusion/execution/src/memory_pool/arrow.rs` wraps DataFusion's `MemoryPool` and implements `arrow_buffer::MemoryPool`. The `arrow-buffer/pool` feature is enabled. ```rust // BEFORE (manual, error-prone): fn emit(&mut self, ...) -> Result<Option<RecordBatch>> { let batch = RecordBatch::try_new(schema, output)?; let batch_size = batch.get_array_memory_size(); // over-counts shared buffers self.transient_reservation.grow(batch_size); Ok(Some(batch)) } // AFTER (post-construction claim, idempotent): fn emit(&mut self, ...) -> Result<Option<RecordBatch>> { let batch = RecordBatch::try_new(schema, output)?; claim_batch(&batch, &self.arrow_pool); // shared buffers counted once Ok(Some(batch)) } ``` **Limitation**: `claim()` is post-construction. A burst allocation can temporarily exceed pool limits before the claim catches it. **Prototype**: https://github.com/wirybeaver/datafusion/pull/3 #### Layer 3: Builder `with_pool()` for real-time enforcement Add `with_pool()` to Arrow builders so every internal buffer growth goes through the pool's reservation system. If the pool is exhausted, the growth fails *before* the allocation happens. **Key finding**: Arrow builders (`PrimitiveBuilder`, `GenericByteBuilder`, `GenericListBuilder`) store values in `Vec<T::Native>`, not `MutableBuffer`. The `Vec` → `MutableBuffer` conversion only happens in `finish()`. This means `MutableBuffer`'s existing `#[cfg(feature = "pool")] reservation` field is bypassed during the entire build phase. A `TrackedVec<T>` wrapper is needed to bridge this gap. ```rust // Real-time enforcement during building: let mut builder = Int32Builder::new() .with_pool(&arrow_pool)?; for row in rows { match builder.try_append_value(row) { Ok(()) => {} Err(_oom) => { // Pool exhausted — spill what we have break; } } } let array = builder.finish(); // Reservation transfers into the array's buffer; freed on drop. ``` **Prototype** (upstream arrow-rs): https://github.com/wirybeaver/arrow-rs/pull/1 ### Comparison | | Manual `try_grow` | Post-construction `claim()` | Builder `with_pool()` | |---|---|---|---| | Can miss sites | Yes | No (claim at output boundary) | No (tracked at alloc) | | Over-counts shared buffers | Yes | No (idempotent) | No (idempotent) | | Enforces limits *before* alloc | Yes (laggy) | No (brief burst possible) | Yes (immediate) | | Exists in Arrow today | N/A | Yes (arrow-buffer 58.3) | No (needs upstream PR) | | Change surface per operator | ~20 size-compute points | ~5 claim points | ~0 (automatic) | ### Describe alternatives you've considered - **`get_slice_memory_size()`** instead of `get_array_memory_size()`: Reduces over-counting for sliced arrays but doesn't solve under-accounting and is still manual. - **`compact()` retained slices**: Eliminates shared buffer references by copying. Correct but adds CPU cost and memory churn. - **Session-level default pool**: All builders pick up a thread-local pool automatically. Eliminates per-builder `with_pool()` calls but adds implicit global state. ### Additional context - Related epic: #22758 - Under-accounting issues: #22723, #22739, #20714 - Over-accounting issues: #16841, #22526 - Arrow `claim()` API: [arrow-rs PR #7303](https://github.com/apache/arrow-rs/pull/7303) - Arrow `ArrayData::claim()`: [arrow-rs PR #8918](https://github.com/apache/arrow-rs/pull/8918) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
