avantgardnerio opened a new issue, #22739:
URL: https://github.com/apache/datafusion/issues/22739
### Background — the DF upgrade chain and where we are
We're a DataFusion-based log analytics platform tracking the DF upgrade
cadence (DF48 → 49 → 50 → 51). Each upgrade has had us untangle a new family of
memory-pressure regressions; #22626's allocator-vs-pool tracker (the
`memory-accounting` Cargo feature) has been the single most useful instrument
for finding them. The work here is from that lens.
### The DF51 regression
DF51 changed the spill story materially — `RepartitionExec` gained spilling
(#18014) and `SpillingPool` was introduced (#18207). The #18014 commit message
itself flagged that it "did not account for memory usage when reading batches
back from disk." Same vintage of regression, broader operator coverage.
Empirically, the same high-cardinality `groupby` query that emits a clean
`ResourcesExhausted` on DF50 now sometimes kernel-OOM-kills the executor on
DF51. Same query path, same data; the failure mode degraded from "structured
error to client" → "pod restart with no signal."
### Production manifestation (anonymized)
One of our customers ran 5 concurrent variants of a `groupby <free-text log
message field>` over a 24-hour window. The workload took down **79 executor
pods in 2 minutes**, then another wave a few minutes later. The query shape:
```
source logs(<team>)
| filter <priority predicate>
&& \$d.message != null
| groupby \$d.message agg count(1) as \$d.occurrences
| orderby \$d.occurrences desc
| limit 15
```
i.e. \`GROUP BY\` on a wide free-text string column with high distinct
cardinality.
### Memory math at production scale
| | Bytes |
|---|---|
| Pod cgroup limit | 31.14 GB |
| DataFusion \`memory_fraction\` (default \`0.7\`) | × 0.7 |
| → declared \`MemoryPool\` budget | **~21.8 GB tracked** |
| Headroom from pool budget to kernel ceiling | ~9.3 GB |
From DF50's \`ResourcesExhausted\` consumer dump at incident time: 5
\`GroupedHashAggregateStream\`s at ~700 MB tracked each = ~3.5 GB explicitly
attributed to those streams. Total \`MemoryPool::reserved()\` at incident time
wasn't captured directly, but the kernel-OOM-kill establishes that **resident
exceeded 31.14 GB while the voluntary tracker thought the agg was within its
share**. Conservatively that's ~7–19 GB of untracked allocation outside
\`MemoryReservation\`'s view, growing linearly with cardinality × key-width ×
concurrency.
### Why the tracker is what made the follow-up reproducible
Without an allocator-level ground truth there's no way to distinguish "we
shipped DF51 and something OOM-kills" from any of the other ~30 reasons pod
resident grows. With #22626 enabled, every \`Layout::size()\` is debited
against the declared \`MemoryPool\` budget; the bank goes negative exactly when
an allocation skipped \`try_grow\`. That converted "production OOM" into
"deterministic SLT failure with a stack trace pointing at the offending site."
The companion PR (sibling to #22721 / #22723) is the follow-up: an SLT that
exercises this exact operator (\`GroupedHashAggregateStream::emit\` →
\`GroupValuesRows::emit\` → \`RowConverter::convert_rows\` → \`decode_column\`
→ \`arrow_row::variable::decode_string_view\` / \`decode_binary_view_inner\`)
and produces a clean \`allocator overdraft: account balance at panic = -1344326
bytes\` against a 1M pool with \`HEADROOM_FACTOR = 5.0\`.
### The bug
In \`GroupValuesRows::emit\`
(\`datafusion/physical-plan/src/aggregates/group_values/row.rs:198\`):
\`\`\`rust
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// ...
let arrays = self.row_converter.convert_rows(rows)?; // ← per-column
decode buffers
// ...
}
\`\`\`
\`RowConverter::convert_rows\` (\`arrow-rs/arrow-row/src/lib.rs:806\`) calls
\`decode_column\` (\`lib.rs:1675\`) per column, dispatching to:
- \`arrow_row::variable::decode_binary\` (\`variable.rs:205\`) for
\`Utf8\`/\`Binary\`
- \`arrow_row::variable::decode_binary_view_inner\` (\`variable.rs:249\`)
for \`Utf8View\`/\`BinaryView\`
- \`arrow_row::list::decode\` for \`List<_>\`
Each decoder allocates \`MutableBuffer::with_capacity\` sized to the
column's bytes. **None of these allocations are routed through
\`MemoryReservation::try_grow\`.** The bytes show up in process resident but
not in \`MemoryPool::reserved()\`, leaking past the declared budget — kernel
OOM territory at production scale.
### Proposed fixes
**(a) Point fix in \`GroupValuesRows::emit\`.** One reservation grow before
\`convert_rows\`:
\`\`\`rust
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let rows = /* slice being converted */;
let estimated = rows.iter().map(|r| r.as_ref().len()).sum::<usize>();
self.reservation.try_grow(estimated)?;
let arrays = self.row_converter.convert_rows(rows)?;
self.reservation.shrink(estimated); // decode result is now owned by
\`arrays\`
// ...
}
\`\`\`
\`GroupValuesRows\` already carries a \`MemoryReservation\` field — this is
one extra \`try_grow\` against it. Same shape as \`GroupValuesBytesView\`,
\`GroupValuesColumn\`, etc. would each need (sibling impls).
**(b) Systemic alternative: \`OomGuardMemoryPool\`.** Rather than chasing
each operator/kernel that bypasses voluntary tracking — a class of bug we've
found at least **6 distinct instances of so far** (see below) — replace the
voluntary \`MemoryPool::try_grow\` contract with allocator-level enforcement.
Concretely: wrap the inner \`MemoryPool\` in a pool that *also* installs a
\`#[global_allocator]\` shim, fails the query when the shim's bank goes
negative, and ignores \`try_grow\` failures as advisory rather than
authoritative.
@andygrove is prototyping the same idea on the Comet side in
[\`apache/datafusion-comet#4582\`](https://github.com/apache/datafusion-comet/pull/4582)
("prototype allocator-level OOM circuit breaker (OomGuard)"). The framing for
both: voluntary tracking will always be incomplete (every Arrow kernel + every
operator + every new feature is a potential miss); allocator-level tracking is
structurally complete.
A practical middle path: ship (a) so this specific operator stops
OOM-killing pods today, and let (b)'s discussion play out in parallel.
### Distinct untracked-allocation sites we've cataloged so far via #22626
1. \`WindowAggExec\` — \`batches.push\` + \`concat_batches\` in
\`window_agg_exec.rs\`
2. \`NestedLoopJoinExec\` — multiple sites:
\`LazyMemoryStream::generate_next_batch\`, \`concat_batches\` in
\`handle_buffering_left_memory_limited\`, \`take_native\` in
\`process_left_range_join\` (issue #22723)
3. \`GroupValuesBytesView::intern\` — hashbrown \`RawTable::reserve_rehash\`
via \`HashTableAllocExt::insert_accounted\`
4. \`GroupValuesRows::intern\` — \`RowConverter::append\` Vec resize via
\`__rust_realloc\`
5. \`GroupValuesRows::emit\` — \`RowConverter::convert_rows\` →
\`decode_column\` (this issue)
6. \`parquet::arrow::array_reader::byte_array::ByteArrayDecoderPlain::read\`
— \`Vec::reserve\` realloc
We expect more as the corpus is exercised. That's the case for (b).
### Component
- [x] Physical Plan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]