avantgardnerio opened a new issue, #22739:
URL: https://github.com/apache/datafusion/issues/22739

   ### Background — the DF upgrade chain and where we are
   
   We're a DataFusion-based log analytics platform tracking the DF upgrade 
cadence (DF48 → 49 → 50 → 51). Each upgrade has had us untangle a new family of 
memory-pressure regressions; #22626's allocator-vs-pool tracker (the 
`memory-accounting` Cargo feature) has been the single most useful instrument 
for finding them. The work here is from that lens.
   
   ### The DF51 regression
   
   DF51 changed the spill story materially — `RepartitionExec` gained spilling 
(#18014) and `SpillingPool` was introduced (#18207). The #18014 commit message 
itself flagged that it "did not account for memory usage when reading batches 
back from disk." Same vintage of regression, broader operator coverage.
   
   Empirically, the same high-cardinality `groupby` query that emits a clean 
`ResourcesExhausted` on DF50 now sometimes kernel-OOM-kills the executor on 
DF51. Same query path, same data; the failure mode degraded from "structured 
error to client" → "pod restart with no signal."
   
   ### Production manifestation (anonymized)
   
   One of our customers ran 5 concurrent variants of a `groupby <free-text log 
message field>` over a 24-hour window. The workload took down **79 executor 
pods in 2 minutes**, then another wave a few minutes later. The query shape:
   
   ```
   source logs(<team>)
     | filter <priority predicate>
           && \$d.message != null
     | groupby \$d.message agg count(1) as \$d.occurrences
     | orderby \$d.occurrences desc
     | limit 15
   ```
   
   i.e. \`GROUP BY\` on a wide free-text string column with high distinct 
cardinality.
   
   ### Memory math at production scale
   
   | | Bytes |
   |---|---|
   | Pod cgroup limit | 31.14 GB |
   | DataFusion \`memory_fraction\` (default \`0.7\`) | × 0.7 |
   | → declared \`MemoryPool\` budget | **~21.8 GB tracked** |
   | Headroom from pool budget to kernel ceiling | ~9.3 GB |
   
   From DF50's \`ResourcesExhausted\` consumer dump at incident time: 5 
\`GroupedHashAggregateStream\`s at ~700 MB tracked each = ~3.5 GB explicitly 
attributed to those streams. Total \`MemoryPool::reserved()\` at incident time 
wasn't captured directly, but the kernel-OOM-kill establishes that **resident 
exceeded 31.14 GB while the voluntary tracker thought the agg was within its 
share**. Conservatively that's ~7–19 GB of untracked allocation outside 
\`MemoryReservation\`'s view, growing linearly with cardinality × key-width × 
concurrency.
   
   ### Why the tracker is what made the follow-up reproducible
   
   Without an allocator-level ground truth there's no way to distinguish "we 
shipped DF51 and something OOM-kills" from any of the other ~30 reasons pod 
resident grows. With #22626 enabled, every \`Layout::size()\` is debited 
against the declared \`MemoryPool\` budget; the bank goes negative exactly when 
an allocation skipped \`try_grow\`. That converted "production OOM" into 
"deterministic SLT failure with a stack trace pointing at the offending site."
   
   The companion PR (sibling to #22721 / #22723) is the follow-up: an SLT that 
exercises this exact operator (\`GroupedHashAggregateStream::emit\` → 
\`GroupValuesRows::emit\` → \`RowConverter::convert_rows\` → \`decode_column\` 
→ \`arrow_row::variable::decode_string_view\` / \`decode_binary_view_inner\`) 
and produces a clean \`allocator overdraft: account balance at panic = -1344326 
bytes\` against a 1M pool with \`HEADROOM_FACTOR = 5.0\`.
   
   ### The bug
   
   In \`GroupValuesRows::emit\` 
(\`datafusion/physical-plan/src/aggregates/group_values/row.rs:198\`):
   
   \`\`\`rust
   fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
       // ...
       let arrays = self.row_converter.convert_rows(rows)?;  // ← per-column 
decode buffers
       // ...
   }
   \`\`\`
   
   \`RowConverter::convert_rows\` (\`arrow-rs/arrow-row/src/lib.rs:806\`) calls 
\`decode_column\` (\`lib.rs:1675\`) per column, dispatching to:
   - \`arrow_row::variable::decode_binary\` (\`variable.rs:205\`) for 
\`Utf8\`/\`Binary\`
   - \`arrow_row::variable::decode_binary_view_inner\` (\`variable.rs:249\`) 
for \`Utf8View\`/\`BinaryView\`
   - \`arrow_row::list::decode\` for \`List<_>\`
   
   Each decoder allocates \`MutableBuffer::with_capacity\` sized to the 
column's bytes. **None of these allocations are routed through 
\`MemoryReservation::try_grow\`.** The bytes show up in process resident but 
not in \`MemoryPool::reserved()\`, leaking past the declared budget — kernel 
OOM territory at production scale.
   
   ### Proposed fixes
   
   **(a) Point fix in \`GroupValuesRows::emit\`.** One reservation grow before 
\`convert_rows\`:
   
   \`\`\`rust
   fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
       let rows = /* slice being converted */;
       let estimated = rows.iter().map(|r| r.as_ref().len()).sum::<usize>();
       self.reservation.try_grow(estimated)?;
       let arrays = self.row_converter.convert_rows(rows)?;
       self.reservation.shrink(estimated);  // decode result is now owned by 
\`arrays\`
       // ...
   }
   \`\`\`
   
   \`GroupValuesRows\` already carries a \`MemoryReservation\` field — this is 
one extra \`try_grow\` against it. Same shape as \`GroupValuesBytesView\`, 
\`GroupValuesColumn\`, etc. would each need (sibling impls).
   
   **(b) Systemic alternative: \`OomGuardMemoryPool\`.** Rather than chasing 
each operator/kernel that bypasses voluntary tracking — a class of bug we've 
found at least **6 distinct instances of so far** (see below) — replace the 
voluntary \`MemoryPool::try_grow\` contract with allocator-level enforcement. 
Concretely: wrap the inner \`MemoryPool\` in a pool that *also* installs a 
\`#[global_allocator]\` shim, fails the query when the shim's bank goes 
negative, and ignores \`try_grow\` failures as advisory rather than 
authoritative.
   
   @andygrove is prototyping the same idea on the Comet side in 
[\`apache/datafusion-comet#4582\`](https://github.com/apache/datafusion-comet/pull/4582)
 ("prototype allocator-level OOM circuit breaker (OomGuard)"). The framing for 
both: voluntary tracking will always be incomplete (every Arrow kernel + every 
operator + every new feature is a potential miss); allocator-level tracking is 
structurally complete.
   
   A practical middle path: ship (a) so this specific operator stops 
OOM-killing pods today, and let (b)'s discussion play out in parallel.
   
   ### Distinct untracked-allocation sites we've cataloged so far via #22626
   
   1. \`WindowAggExec\` — \`batches.push\` + \`concat_batches\` in 
\`window_agg_exec.rs\`
   2. \`NestedLoopJoinExec\` — multiple sites: 
\`LazyMemoryStream::generate_next_batch\`, \`concat_batches\` in 
\`handle_buffering_left_memory_limited\`, \`take_native\` in 
\`process_left_range_join\` (issue #22723)
   3. \`GroupValuesBytesView::intern\` — hashbrown \`RawTable::reserve_rehash\` 
via \`HashTableAllocExt::insert_accounted\`
   4. \`GroupValuesRows::intern\` — \`RowConverter::append\` Vec resize via 
\`__rust_realloc\`
   5. \`GroupValuesRows::emit\` — \`RowConverter::convert_rows\` → 
\`decode_column\` (this issue)
   6. \`parquet::arrow::array_reader::byte_array::ByteArrayDecoderPlain::read\` 
— \`Vec::reserve\` realloc
   
   We expect more as the corpus is exercised. That's the case for (b).
   
   ### Component
   
   - [x] Physical Plan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to