adriangb opened a new pull request, #22440:
URL: https://github.com/apache/datafusion/pull/22440
**Stacks on top of #22384 — both branches need to land for the diff to be
coherent.**
## Summary
Add `ExecutionPlan::benefits_from_output_partitioning() -> bool` (default
`false`) as the symmetric counterpart of the existing
`benefits_from_input_partitioning`. The optimizer's `EnforceDistribution`
already inserts a `RepartitionExec(RoundRobinBatch(target_partitions))` when a
parent's `benefits_from_input_partitioning` is `true`. With this addition it
also fires when the child itself opts in via
`benefits_from_output_partitioning` — no special handling in `repartitioned()`
or `DistributionContext` bookkeeping.
## Why
When a parquet scan owns a filter and #22384 runs it post-decode inside the
scan thread (the `pushdown_filters = false` path), there is no sibling
`FilterExec` above the scan. Single-partition consumers — `SortExec`,
`CoalescePartitionsExec`, a `CollectLeft` hash-join build — therefore inherit a
single-thread scan + filter, even when the cluster has plenty of idle cores.
The companion PRs (#22438 disabling join dynamic filter pushdown by default,
#22439 lowering `repartition_file_min_size` to 1 MiB) close most of the
regression but leave TPC-DS with ~18 queries still slower than main on small
dim-table joins where byte-range splitting alone can't reach
`target_partitions`. This PR closes the rest.
## Wiring
```
ExecutionPlan ─┬─ DataSourceExec ->
DataSource::benefits_from_output_partitioning
│
DataSource ─── FileScanConfig ->
FileSource::benefits_from_output_partitioning
│
FileSource ─── ParquetSource -> predicate.is_some() &&
!pushdown_filters()
```
The `pushdown_filters = true` gate is important: with `RowFilter` doing the
work during decode, the round-robin wouldn't help and would also defeat
limit-pushdown for ordered scans.
## Benchmark numbers (12 cores, SF1)
Run with the companion PRs (#22438 + #22439) applied so the dynamic-filter
and split-size doors are open:
| Suite | PR #22384 alone | + this PR |
|---|---|---|
| TPC-H slower-than-main | 2 | 2 |
| **TPC-DS slower-than-main** | **18** | **2** |
| ClickBench slower-than-main | 3 | 4 |
The remaining residuals (TPC-H Q5 ~3%, TPC-DS Q41 ~4% on a 15 ms query,
ClickBench Q13 ~5%) look like fixed-cost per-batch overhead in the post-scan
filter path itself and are within run-to-run variance for the rest.
## Test plan
- [x] `cargo test --test sqllogictests` — all 472 files pass after snapshot
updates that all show `RepartitionExec: partitioning=RoundRobinBatch(N)`
inserted above filtered scans where a single-partition parent sits above.
- [x] `cargo test -p datafusion --test core_integration`
- [ ] `run benchmarks`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]