adriangb opened a new pull request, #22440:
URL: https://github.com/apache/datafusion/pull/22440

   **Stacks on top of #22384 — both branches need to land for the diff to be 
coherent.**
   
   ## Summary
   
   Add `ExecutionPlan::benefits_from_output_partitioning() -> bool` (default 
`false`) as the symmetric counterpart of the existing 
`benefits_from_input_partitioning`. The optimizer's `EnforceDistribution` 
already inserts a `RepartitionExec(RoundRobinBatch(target_partitions))` when a 
parent's `benefits_from_input_partitioning` is `true`. With this addition it 
also fires when the child itself opts in via 
`benefits_from_output_partitioning` — no special handling in `repartitioned()` 
or `DistributionContext` bookkeeping.
   
   ## Why
   
   When a parquet scan owns a filter and #22384 runs it post-decode inside the 
scan thread (the `pushdown_filters = false` path), there is no sibling 
`FilterExec` above the scan. Single-partition consumers — `SortExec`, 
`CoalescePartitionsExec`, a `CollectLeft` hash-join build — therefore inherit a 
single-thread scan + filter, even when the cluster has plenty of idle cores. 
The companion PRs (#22438 disabling join dynamic filter pushdown by default, 
#22439 lowering `repartition_file_min_size` to 1 MiB) close most of the 
regression but leave TPC-DS with ~18 queries still slower than main on small 
dim-table joins where byte-range splitting alone can't reach 
`target_partitions`. This PR closes the rest.
   
   ## Wiring
   
   ```
   ExecutionPlan ─┬─ DataSourceExec  -> 
DataSource::benefits_from_output_partitioning
                  │
   DataSource ─── FileScanConfig    -> 
FileSource::benefits_from_output_partitioning
                  │
   FileSource ─── ParquetSource     -> predicate.is_some() && 
!pushdown_filters()
   ```
   
   The `pushdown_filters = true` gate is important: with `RowFilter` doing the 
work during decode, the round-robin wouldn't help and would also defeat 
limit-pushdown for ordered scans.
   
   ## Benchmark numbers (12 cores, SF1)
   
   Run with the companion PRs (#22438 + #22439) applied so the dynamic-filter 
and split-size doors are open:
   
   | Suite | PR #22384 alone | + this PR |
   |---|---|---|
   | TPC-H slower-than-main | 2 | 2 |
   | **TPC-DS slower-than-main** | **18** | **2** |
   | ClickBench slower-than-main | 3 | 4 |
   
   The remaining residuals (TPC-H Q5 ~3%, TPC-DS Q41 ~4% on a 15 ms query, 
ClickBench Q13 ~5%) look like fixed-cost per-batch overhead in the post-scan 
filter path itself and are within run-to-run variance for the rest.
   
   ## Test plan
   
   - [x] `cargo test --test sqllogictests` — all 472 files pass after snapshot 
updates that all show `RepartitionExec: partitioning=RoundRobinBatch(N)` 
inserted above filtered scans where a single-partition parent sits above.
   - [x] `cargo test -p datafusion --test core_integration`
   - [ ] `run benchmarks`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to