Dandandan opened a new pull request, #23124:
URL: https://github.com/apache/datafusion/pull/23124
## Which issue does this PR close?
- N/A (new optimization). Happy to file a tracking issue if preferred.
## Rationale for this change
`SortPreservingMergeExec` performs a **single-threaded** k-way merge of all
input
partitions on one thread. For merge-bound sort queries (e.g. `ORDER BY` over
many
sorted partitions) this leaves most cores idle while one thread does all the
work.
This PR adds a parallel order-preserving merge that splits the merge across
`target_partitions` threads, giving a large speedup on merge-bound queries:
- **~2.36×** on the `sort_preserving_merge` microbenchmark (1M rows × 3
partitions, single u64 key): 33.7 ms → 14.3 ms.
- **~2–2.5×** on sort_tpch (Q1 2.54×, Q2 2.31×, Q10 1.93×).
## What changes are included in this PR?
- **`ParallelSortPreservingMergeExec`** (`sorts/parallel_merge.rs`) — a
parallel
order-preserving merge using **Parallel Sorting by Regular Sampling
(PSRS)**:
1. Materialize each locally-sorted input into a run (payload kept
zero-copy;
sort keys encoded on demand with a single shared `RowConverter` so all
keys
are byte-comparable).
2. Draw a regular sample of keys across runs and pick `P-1` pivots.
3. Cut every run by the *same* pivots via binary search (`lower_bound`).
4. Merge the `P` key-range buckets **concurrently** — one `SpawnedTask`
each,
reusing the existing optimized loser-tree `StreamingMergeBuilder`.
5. Concatenate the buckets in order → one globally-sorted partition, the
same
output contract as `SortPreservingMergeExec`.
Correctness does not depend on balanced pivots; regular sampling only
affects
load balance. Output partitioning is `UnknownPartitioning(1)`.
- **`ParallelSortMerge`** physical-optimizer rule (`parallel_sort_merge.rs`,
runs
after `PushdownSort`) that replaces an eligible `SortPreservingMergeExec`
with
the parallel exec. Gated by
**`datafusion.optimizer.enable_parallel_sort_merge`**
(default `true`). Eligible iff: no fetch/limit, **bounded** input, `> 1`
input
partition, and a **known** row count `>= batch_size * target_partitions`
(unknown-size inputs keep the serial merge to avoid regressing small data).
- A `--parallel-merge` A/B flag on the `sort_tpch` benchmark, a parallel
variant
in the `sort_preserving_merge` criterion bench, and regenerated
`configs.md`.
### Limitations
It is **pipeline-breaking**: it materializes all of its input and emits
nothing
until every bucket is merged, so it never honors a pushed-down fetch/limit
and is
restricted to bounded inputs. Peak memory ≈ full input + output (tracked via
`MemoryReservation`s, not currently spillable). Queries needing early
termination
keep using `SortPreservingMergeExec`. Marked draft to gather feedback on the
default and the memory trade-off.
## Are these changes tested?
Yes:
- Unit tests (`sorts::parallel_merge`): output matches
`SortPreservingMergeExec`
for unique keys, heavy low-cardinality ties, descending + nulls,
uneven/empty
partitions, more-buckets-than-rows, and single-partition passthrough.
- The optimizer-rule path is exercised end-to-end via sqllogictest with the
flag
on by default; EXPLAIN plans in a few files now show
`ParallelSortPreservingMergeExec` (results are unchanged — the output is
identical to the serial merge).
- Benchmarks above (microbench + sort_tpch).
## Are there any user-facing changes?
- New config `datafusion.optimizer.enable_parallel_sort_merge` (default
`true`).
- `EXPLAIN` shows `ParallelSortPreservingMergeExec` where the rule applies,
and
such plans avoid the single-threaded merge bottleneck (at the cost of
materializing the input). No public API breakage.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]