Dandandan opened a new pull request, #23124:
URL: https://github.com/apache/datafusion/pull/23124

   ## Which issue does this PR close?
   
   - N/A (new optimization). Happy to file a tracking issue if preferred.
   
   ## Rationale for this change
   
   `SortPreservingMergeExec` performs a **single-threaded** k-way merge of all 
input
   partitions on one thread. For merge-bound sort queries (e.g. `ORDER BY` over 
many
   sorted partitions) this leaves most cores idle while one thread does all the 
work.
   
   This PR adds a parallel order-preserving merge that splits the merge across
   `target_partitions` threads, giving a large speedup on merge-bound queries:
   
   - **~2.36×** on the `sort_preserving_merge` microbenchmark (1M rows × 3
     partitions, single u64 key): 33.7 ms → 14.3 ms.
   - **~2–2.5×** on sort_tpch (Q1 2.54×, Q2 2.31×, Q10 1.93×).
   
   ## What changes are included in this PR?
   
   - **`ParallelSortPreservingMergeExec`** (`sorts/parallel_merge.rs`) — a 
parallel
     order-preserving merge using **Parallel Sorting by Regular Sampling 
(PSRS)**:
     1. Materialize each locally-sorted input into a run (payload kept 
zero-copy;
        sort keys encoded on demand with a single shared `RowConverter` so all 
keys
        are byte-comparable).
     2. Draw a regular sample of keys across runs and pick `P-1` pivots.
     3. Cut every run by the *same* pivots via binary search (`lower_bound`).
     4. Merge the `P` key-range buckets **concurrently** — one `SpawnedTask` 
each,
        reusing the existing optimized loser-tree `StreamingMergeBuilder`.
     5. Concatenate the buckets in order → one globally-sorted partition, the 
same
        output contract as `SortPreservingMergeExec`.
   
     Correctness does not depend on balanced pivots; regular sampling only 
affects
     load balance. Output partitioning is `UnknownPartitioning(1)`.
   
   - **`ParallelSortMerge`** physical-optimizer rule (`parallel_sort_merge.rs`, 
runs
     after `PushdownSort`) that replaces an eligible `SortPreservingMergeExec` 
with
     the parallel exec. Gated by 
**`datafusion.optimizer.enable_parallel_sort_merge`**
     (default `true`). Eligible iff: no fetch/limit, **bounded** input, `> 1` 
input
     partition, and a **known** row count `>= batch_size * target_partitions`
     (unknown-size inputs keep the serial merge to avoid regressing small data).
   
   - A `--parallel-merge` A/B flag on the `sort_tpch` benchmark, a parallel 
variant
     in the `sort_preserving_merge` criterion bench, and regenerated 
`configs.md`.
   
   ### Limitations
   
   It is **pipeline-breaking**: it materializes all of its input and emits 
nothing
   until every bucket is merged, so it never honors a pushed-down fetch/limit 
and is
   restricted to bounded inputs. Peak memory ≈ full input + output (tracked via
   `MemoryReservation`s, not currently spillable). Queries needing early 
termination
   keep using `SortPreservingMergeExec`. Marked draft to gather feedback on the
   default and the memory trade-off.
   
   ## Are these changes tested?
   
   Yes:
   - Unit tests (`sorts::parallel_merge`): output matches 
`SortPreservingMergeExec`
     for unique keys, heavy low-cardinality ties, descending + nulls, 
uneven/empty
     partitions, more-buckets-than-rows, and single-partition passthrough.
   - The optimizer-rule path is exercised end-to-end via sqllogictest with the 
flag
     on by default; EXPLAIN plans in a few files now show
     `ParallelSortPreservingMergeExec` (results are unchanged — the output is
     identical to the serial merge).
   - Benchmarks above (microbench + sort_tpch).
   
   ## Are there any user-facing changes?
   
   - New config `datafusion.optimizer.enable_parallel_sort_merge` (default 
`true`).
   - `EXPLAIN` shows `ParallelSortPreservingMergeExec` where the rule applies, 
and
     such plans avoid the single-threaded merge bottleneck (at the cost of
     materializing the input). No public API breakage.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to