junknown opened a new issue, #22848:
URL: https://github.com/apache/datafusion/issues/22848

   ### Is your feature request related to a problem or challenge?
   
   When a spilling operator (hash aggregate, sort) merges its spill files, the 
number of
     spill files opened simultaneously in a single merge pass is bounded only 
by available
     memory, never by a file-count ceiling.
   
     In `MultiLevelMergeBuilder::get_sorted_spill_files_to_merge`
     (datafusion-physical-plan/src/sorts/multi_level_merge.rs), the loop keeps 
adding spill
     files to the current pass as long as `reservation.try_grow(...)` succeeds 
— i.e. as long
     as the (often large, shared) memory pool has room. There is no upper bound 
on how many
     files are opened at once; the only floor is "at least 2 streams". 
`DiskManager` exposes
     `max_temp_directory_size` (total bytes on disk) but nothing for the number 
of open
     files / merge fan-in.
   
     This becomes a process-level problem on high-core machines. The 
aggregation runs
     `target_partitions` streams in parallel (= num CPUs by default), and each 
partition runs
     its own multi-level merge. With a generous memory pool, each merge opens 
many spill files
     at once, so peak open file descriptors ≈ `target_partitions × 
fan_in_per_pass`. On a
     many-core node a heavy `GROUP BY` / `COUNT(DISTINCT ...)` over many 
spilled files fails
     with:
   
         IO error: Too many open files (os error 24)
   
     on a spill temp file. The error is a `Result` and is handled gracefully, 
but EMFILE is a
     *process-wide* condition: at the peak it can transiently break unrelated 
operations in the
     same process (opening other files, network connections, logging, other 
concurrent queries).
   
     Today the only real control is the OS `ulimit -n`. Lowering 
`target_partitions` works but
     penalizes *every* query on the node — including the vast majority that 
never spill — which
     is the wrong trade-off, since non-spilling queries never enter the merge 
path at all.
   
   ### Describe the solution you'd like
   
   A configuration knob that caps the merge fan-in — the maximum number of 
spill files
     opened simultaneously in a single merge pass — independently of available 
memory. When
     the cap is hit, the merge simply performs more passes (slower, but bounded 
file
     descriptors), which is exactly the desired trade-off for "make a very 
heavy aggregation
     feasible, even if slower".
   
     Proposed shape:
   
     - Add `max_merge_fanin: usize` (0 = unbounded, the current behavior and 
the default) to
       `DiskManagerBuilder` / `DiskManager` in datafusion-execution, alongside 
the existing
       `max_temp_directory_size`. It is a natural home: it is the type that 
already carries the
       spill/disk caps, and `SpillManager` already reaches it via 
`env.disk_manager`.
   
     - In `MultiLevelMergeBuilder::get_sorted_spill_files_to_merge`, after the 
existing
       memory-driven loop, clamp the count:
   
           let cap = self.spill_manager.env().disk_manager.max_merge_fanin();
           if cap != 0 {
               number_of_spills_to_read_for_current_phase =
                   number_of_spills_to_read_for_current_phase.min(cap.max(2));
           }
   
       The `.max(2)` preserves the existing "at least 2 streams per merge" 
invariant, and the
       surrounding algorithm already handles leftover spill files by doing 
additional passes.
   
     Crucially, this is zero-cost for queries that do not spill: 
`MultiLevelMergeBuilder` is
     only constructed when there are spill files to merge, so non-spilling 
queries never
     execute this branch. `target_partitions` can stay high (full parallelism 
for everyone),
     and only the heavy spilling operators pay the price — more merge passes in 
exchange for a
     peak FD bound of `target_partitions × max_merge_fanin`.
   
     Optionally surface it as an execution config option (e.g.
     `datafusion.execution.max_merge_fanin`) so it can be set per-session.
   
   
   ### Describe alternatives you've considered
   
   - Lowering `target_partitions`: reduces the FD multiplier, but it is a 
per-node global
       that slows down *all* queries, including the majority that never spill. 
Wrong altitude:
       it taxes the common case to fix the rare heavy-aggregation case.
   
     - Lowering the memory pool size: indirectly shrinks the per-pass fan-in 
(toward the floor
       of 2), but the pool is shared across all concurrent queries, the 
relationship to FD count
       is indirect, and a smaller pool makes *more* queries spill.
   
     - Raising `ulimit -n` / `LimitNOFILE`: a necessary operational mitigation, 
but it only
       moves the ceiling; it does not give the engine a way to bound its own FD 
usage, and the
       bound still scales with cores × data.
   
     - `max_temp_directory_size`: caps total bytes on disk, not the number of 
files open at once,
       so it does not address EMFILE.
   
     None of these give a direct, memory-independent bound on 
simultaneously-open spill files
     that is also free for non-spilling queries — which the proposed fan-in cap 
does.
   
   
   ### Additional context
   
   Observed on DataFusion 53.1.0; the same code path is present on `main`.
   
     Repro shape: a `SELECT COUNT(DISTINCT col_b), col_a FROM t GROUP BY col_a` 
over a large
     dataset that forces the hash aggregate to spill many files, on a many-core 
machine with a
     default-ish `ulimit -n` (e.g. 1024). `COUNT(DISTINCT ...)` is expanded 
into two stacked
     aggregation layers, both of which can spill, compounding the FD pressure.
   
     Relevant code:
     - datafusion-physical-plan/src/sorts/multi_level_merge.rs —
       `MultiLevelMergeBuilder::get_sorted_spill_files_to_merge` (the 
unbounded, memory-driven
       fan-in loop).
     - datafusion-physical-plan/src/aggregates/row_hash.rs — the hash aggregate 
hands all
       accumulated spill files to 
`StreamingMergeBuilder::with_sorted_spill_files`.
     - datafusion-execution/src/disk_manager.rs — `DiskManagerBuilder` / 
`DiskManager`, where
       `max_temp_directory_size` lives and where `max_merge_fanin` would be 
added.
   
     Related: #7858 touches spilling improvements in the hash aggregate and 
notes the
     "too many open files" concern in passing, but does not propose a fan-in 
cap.
   
     Happy to open a PR implementing the above if the approach sounds 
reasonable.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to