mbutrovich opened a new pull request, #21600:
URL: https://github.com/apache/datafusion/pull/21600

   Draft for benchmarking, builds on 
https://github.com/apache/datafusion/pull/21525.
   
   ## Which issue does this PR close?
   
   Partially addresses #21543.
   
   ## Rationale for this change
   
   ExternalSorter's merge path sorts each incoming batch individually 
(typically 8192 rows), then k-way merges all of them. This creates two problems:
   
   1. **Too many small sorted runs.** At scale (TPC-H SF10, ~60M rows in 
lineitem), ~7300 individually-sorted batches feed the k-way merge with high 
fan-in.
   2. **Radix sort can't amortize encoding.** MSD radix sort 
(apache/arrow-rs#9683) is 2-3x faster than `lexsort_to_indices` at 32K+ rows, 
but at 8K rows the `RowConverter` encoding cost dominates. TPC-H SF10 
benchmarks on #21525 confirmed this: naively swapping in radix sort made 12/22 
queries **slower** (up to 1.20x).
   
   ## What changes are included in this PR?
   
   ### Chunked sort pipeline
   
   Replaces ExternalSorter's buffer-then-sort architecture with a 
coalesce-then-sort pipeline:
   
   - Incoming batches accumulate in a `BatchCoalescer` until a target row count 
is reached
   - Each coalesced batch is sorted immediately and stored as a pre-sorted run
   - On memory pressure, sorted runs are spilled directly to disk (one file per 
run when no merge headroom, merged into a single file when headroom is 
available)
   - At query completion, runs are k-way merged via the existing loser tree 
(`StreamingMergeBuilder`)
   
   ### Two-target coalescing based on schema
   
   - **Radix-eligible** (primitives, strings): coalesce to 
`sort_coalesce_target_rows` (new config, default 32768), radix sort, chunk 
output back to `batch_size`
   - **Non-radix** (all-dictionary, nested types): coalesce to `batch_size` 
with large-batch bypass (`biggest_coalesce_batch_size = batch_size/2`) so 
already-sized batches pass through zero-copy
   
   ### Graceful degradation
   
   Under memory pressure, the coalescer flushes early, producing smaller 
chunks. Below `batch_size` rows, the pipeline falls back to lexsort, matching 
the old per-batch sort behavior. This means radix sort is only used when there 
are enough rows to amortize encoding overhead.
   
   ### Spill strategy
   
   - **With merge headroom** (`sort_spill_reservation_bytes > 0`): merge all 
runs into a single sorted stream before spilling to one file. Fewer files = 
lower fan-in for the final `MultiLevelMerge`.
   - **Without headroom** (`sort_spill_reservation_bytes == 0`): spill each run 
as its own file to avoid allocating merge cursor infrastructure when the pool 
has no room.
   
   ### Dead code removal
   
   Since sorted runs are already sorted, spilling no longer requires an 
in-memory merge step. This removes `in_mem_sort_stream`, `sort_batch_stream`, 
`consume_and_spill_append`, `spill_finish`, `organize_stringview_arrays`, and 
the `in_progress_spill_file` field.
   
   ### Config changes
   
   - New: `datafusion.execution.sort_coalesce_target_rows` (default 32768)
   - Deprecated: `datafusion.execution.sort_in_place_threshold_bytes` (no 
longer read, `warn` attribute per API health policy)
   
   ## Are these changes tested?
   
   - 4 new unit tests:
     - `test_chunked_sort_radix_coalescing` — verifies coalescing to 32K rows 
and chunking back to `batch_size`
     - `test_chunked_sort_partial_flush` — verifies partial coalescer flush on 
`sort()`
     - `test_spill_creates_one_file_per_run` — verifies per-run spill without 
merge headroom
     - `test_spill_merges_runs_with_headroom` — verifies merged spill with 
merge headroom
   - All 30 existing sort unit tests pass
   - All 30 memory limit integration tests pass
   - All 15 sort fuzz tests pass (including `extended_tests`)
   - All 10 spilling fuzz tests pass
   - `information_schema.slt` updated for new config
   
   ## Are there any user-facing changes?
   
   - New config option `sort_coalesce_target_rows` (default 32768) controls the 
coalesce target for radix-eligible schemas
   - `sort_in_place_threshold_bytes` is deprecated and no longer used (emits a 
warning if set)
   - Sort behavior under memory pressure may differ: the pipeline is more 
memory-efficient (shrinks reservations after sorting) so some workloads may 
spill less frequently


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to