junknown opened a new issue, #22848:
URL: https://github.com/apache/datafusion/issues/22848
### Is your feature request related to a problem or challenge?
When a spilling operator (hash aggregate, sort) merges its spill files, the
number of
spill files opened simultaneously in a single merge pass is bounded only
by available
memory, never by a file-count ceiling.
In `MultiLevelMergeBuilder::get_sorted_spill_files_to_merge`
(datafusion-physical-plan/src/sorts/multi_level_merge.rs), the loop keeps
adding spill
files to the current pass as long as `reservation.try_grow(...)` succeeds
— i.e. as long
as the (often large, shared) memory pool has room. There is no upper bound
on how many
files are opened at once; the only floor is "at least 2 streams".
`DiskManager` exposes
`max_temp_directory_size` (total bytes on disk) but nothing for the number
of open
files / merge fan-in.
This becomes a process-level problem on high-core machines. The
aggregation runs
`target_partitions` streams in parallel (= num CPUs by default), and each
partition runs
its own multi-level merge. With a generous memory pool, each merge opens
many spill files
at once, so peak open file descriptors ≈ `target_partitions ×
fan_in_per_pass`. On a
many-core node a heavy `GROUP BY` / `COUNT(DISTINCT ...)` over many
spilled files fails
with:
IO error: Too many open files (os error 24)
on a spill temp file. The error is a `Result` and is handled gracefully,
but EMFILE is a
*process-wide* condition: at the peak it can transiently break unrelated
operations in the
same process (opening other files, network connections, logging, other
concurrent queries).
Today the only real control is the OS `ulimit -n`. Lowering
`target_partitions` works but
penalizes *every* query on the node — including the vast majority that
never spill — which
is the wrong trade-off, since non-spilling queries never enter the merge
path at all.
### Describe the solution you'd like
A configuration knob that caps the merge fan-in — the maximum number of
spill files
opened simultaneously in a single merge pass — independently of available
memory. When
the cap is hit, the merge simply performs more passes (slower, but bounded
file
descriptors), which is exactly the desired trade-off for "make a very
heavy aggregation
feasible, even if slower".
Proposed shape:
- Add `max_merge_fanin: usize` (0 = unbounded, the current behavior and
the default) to
`DiskManagerBuilder` / `DiskManager` in datafusion-execution, alongside
the existing
`max_temp_directory_size`. It is a natural home: it is the type that
already carries the
spill/disk caps, and `SpillManager` already reaches it via
`env.disk_manager`.
- In `MultiLevelMergeBuilder::get_sorted_spill_files_to_merge`, after the
existing
memory-driven loop, clamp the count:
let cap = self.spill_manager.env().disk_manager.max_merge_fanin();
if cap != 0 {
number_of_spills_to_read_for_current_phase =
number_of_spills_to_read_for_current_phase.min(cap.max(2));
}
The `.max(2)` preserves the existing "at least 2 streams per merge"
invariant, and the
surrounding algorithm already handles leftover spill files by doing
additional passes.
Crucially, this is zero-cost for queries that do not spill:
`MultiLevelMergeBuilder` is
only constructed when there are spill files to merge, so non-spilling
queries never
execute this branch. `target_partitions` can stay high (full parallelism
for everyone),
and only the heavy spilling operators pay the price — more merge passes in
exchange for a
peak FD bound of `target_partitions × max_merge_fanin`.
Optionally surface it as an execution config option (e.g.
`datafusion.execution.max_merge_fanin`) so it can be set per-session.
### Describe alternatives you've considered
- Lowering `target_partitions`: reduces the FD multiplier, but it is a
per-node global
that slows down *all* queries, including the majority that never spill.
Wrong altitude:
it taxes the common case to fix the rare heavy-aggregation case.
- Lowering the memory pool size: indirectly shrinks the per-pass fan-in
(toward the floor
of 2), but the pool is shared across all concurrent queries, the
relationship to FD count
is indirect, and a smaller pool makes *more* queries spill.
- Raising `ulimit -n` / `LimitNOFILE`: a necessary operational mitigation,
but it only
moves the ceiling; it does not give the engine a way to bound its own FD
usage, and the
bound still scales with cores × data.
- `max_temp_directory_size`: caps total bytes on disk, not the number of
files open at once,
so it does not address EMFILE.
None of these give a direct, memory-independent bound on
simultaneously-open spill files
that is also free for non-spilling queries — which the proposed fan-in cap
does.
### Additional context
Observed on DataFusion 53.1.0; the same code path is present on `main`.
Repro shape: a `SELECT COUNT(DISTINCT col_b), col_a FROM t GROUP BY col_a`
over a large
dataset that forces the hash aggregate to spill many files, on a many-core
machine with a
default-ish `ulimit -n` (e.g. 1024). `COUNT(DISTINCT ...)` is expanded
into two stacked
aggregation layers, both of which can spill, compounding the FD pressure.
Relevant code:
- datafusion-physical-plan/src/sorts/multi_level_merge.rs —
`MultiLevelMergeBuilder::get_sorted_spill_files_to_merge` (the
unbounded, memory-driven
fan-in loop).
- datafusion-physical-plan/src/aggregates/row_hash.rs — the hash aggregate
hands all
accumulated spill files to
`StreamingMergeBuilder::with_sorted_spill_files`.
- datafusion-execution/src/disk_manager.rs — `DiskManagerBuilder` /
`DiskManager`, where
`max_temp_directory_size` lives and where `max_merge_fanin` would be
added.
Related: #7858 touches spilling improvements in the hash aggregate and
notes the
"too many open files" concern in passing, but does not propose a fan-in
cap.
Happy to open a PR implementing the above if the approach sounds
reasonable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]