zhuqi-lucas opened a new issue, #21733:
URL: https://github.com/apache/datafusion/issues/21733
## Background
We now have several complementary optimizations for TopK queries on parquet:
1. **Dynamic work scheduling** (#21351, merged) — sibling `FileStream`
partitions share a work queue and steal files from each other, ensuring no CPU
sits idle.
2. **RG reorder by statistics** (#21580, in progress) — reorders row groups
within a file by min/max statistics so TopK reads the best RGs first.
3. **TopK threshold initialization from statistics** (#21712, draft) —
initializes the dynamic filter threshold from RG statistics before reading any
data.
When combining #21351 + #21580 (tested in #21731), the RG reorder shows
**minimal additional improvement** over dynamic scheduling alone. This is
because with multiple partitions reading files in parallel, one partition
quickly finds good values "by luck" and updates the shared dynamic filter —
making the precise intra-file RG ordering less impactful.
## Problem
The shared work queue in #21351 (`SharedWorkSource`) uses the original file
order — files are placed in the queue in whatever order they appear in
`file_groups`. This means:
- The first file picked by the first partition may have a poor value range
- Threshold convergence depends on which partition happens to read a good
file first
- With many files of varying ranges, the "unlucky first pick" wastes
parallel capacity
## Proposed optimization
**Sort files in the shared work queue by column statistics** before any
reading begins. For `ORDER BY col DESC LIMIT K`: put the file with the highest
min value first. For ASC: lowest max first.
This ensures:
1. The **very first file read** is the globally optimal one — tight
threshold from the first RG
2. Combined with TopK stats init (#21712): threshold is set **before reading
a single byte** and it's the global optimum
3. All other partitions immediately benefit from the shared dynamic filter
4. Subsequent files are already ordered by quality — if the first file's
threshold prunes most RGs, the second-best file is next in line
### Full optimization chain
```
Global file reorder (best file first in shared queue)
→ TopK stats init (threshold from RG stats before I/O)
→ RG reorder within file (best RG first)
→ Dynamic scheduling (idle partitions steal work)
→ Dynamic filter pruning (skip RGs/files below threshold)
```
Each layer builds on the previous: global file ordering ensures the optimal
starting point, stats init avoids wasting I/O on the first file, RG reorder
optimizes within-file order, dynamic scheduling keeps all CPUs busy, and the
dynamic filter propagates the threshold globally.
## Implementation sketch
In `SharedWorkSource::from_config()` (or a new constructor):
1. Get the sort column and direction from the `FileScanConfig`'s output
ordering or from the `DynamicFilterPhysicalExpr`'s `sort_options`
2. For files that have `PartitionedFile.statistics` with min/max for the
sort column:
- DESC: sort files by `column_statistics[sort_col].min_value` descending
(highest min first)
- ASC: sort files by `column_statistics[sort_col].max_value` ascending
(lowest max first)
3. Files without statistics go to the end of the queue
4. When `preserve_order` is true, skip reordering (correctness requirement)
## Expected impact
- **Small data (SF=1)**: moderate improvement — fewer files to iterate
before finding optimal threshold
- **Large data (SF=10/100)**: significant improvement — with hundreds of
files, the difference between reading the best file first vs. a random file
first determines whether 90% of subsequent files are pruned immediately or
after several files
- **Combined with #21712 (stats init)**: multiplicative — stats init on the
globally-best file gives the tightest possible threshold without reading any
data
## Related
- #21351 — Dynamic work scheduling (merged)
- #21580 — RG reorder by statistics (in progress)
- #21712 — TopK stats init from parquet statistics (draft)
- #21731 — Combined benchmark branch for #21351 + #21580
- #21691 — Initialize TopK from file/rowgroup statistics (parent issue)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]