zhuqi-lucas opened a new pull request, #22385:
URL: https://github.com/apache/datafusion/pull/22385

   ## Which issue does this PR close?
   
   Carved out of [#21580](https://github.com/apache/datafusion/pull/21580) — 
after [#21956](https://github.com/apache/datafusion/pull/21956) merged the 
file/RG reorder + reverse machinery, this PR ships the two remaining pieces of 
the statistics-driven TopK pipeline, restricted to the simplest scope (no 
WHERE).
   
   ## Rationale for this change
   
   For `ORDER BY <col> LIMIT N` queries on parquet:
   
   1. `TopK`'s dynamic filter starts as `lit(true)` — the first row groups read 
are never pruned by it.
   2. After file/RG reorder normalises data to ASC-by-min (and optional reverse 
for DESC), the first few row groups already contain the rows that will end up 
in the heap. The rest can be pruned upfront from parquet `min/max` statistics 
if the threshold is known before scanning.
   
   This PR initialises the threshold from per-RG stats before the 
`PruningPredicate` is built, then accumulates row counts from the front of the 
reordered RG list and truncates once $\geq K$. The net effect on a sorted, 
non-overlapping dataset is that **only one row group is decoded** for a `LIMIT 
N` query — observed [17×–60× speedups in #21580's 
benchmark](https://github.com/apache/datafusion/pull/21580#issue-3585580010).
   
   ## What changes are included in this PR?
   
   Four small focused commits plus one SLT commit.
   
   ### 1. `DynamicFilterPhysicalExpr`: add `sort_options` + `fetch`
   
   New optional fields and a `new_with_sort_options(...)` constructor. 
Producers (TopK in `SortExec`) can describe each child's sort direction and the 
LIMIT $K$; consumers (parquet opener) can use them to initialise the filter 
threshold and to do cumulative RG pruning. No behaviour change on its own.
   
   ### 2. Wire `sort_options` + `fetch` through `SortExec::create_filter`
   
   Builds the dynamic filter via `new_with_sort_options`, passing per-child 
sort options and `self.fetch`. **Bug fix in `with_fetch`**: previously 
`create_filter()` was called before `fetch` was set, so the filter saw `fetch = 
None`. Restructured to create the new sort, set `fetch`, then call 
`create_filter`.
   
   ### 3. `PreparedAccessPlan::truncate_row_groups`
   
   Small helper that keeps the first `count` row-group indexes and drops the 
rest. Bails out unchanged if a `row_selection` is set (page-level pruning state 
would be hard to remap). Three unit tests. No call sites yet.
   
   ### 4. Opener: stats init + cumulative RG prune
   
   Both optimisations are dual-gated:
   
   * **no-WHERE** — the predicate handed to the opener is a bare 
`DynamicFilterPhysicalExpr` (`predicate_is_pure_dynamic_filter`). With a real 
`WHERE`, raw `num_rows` would over-estimate qualifying rows and risk 
under-returning K; the dynamic filter still tightens after a few batches but 
the cumulative shortcut is unsafe.
   * **sort pushdown** — `sort_order_for_reorder` is set on the 
`ParquetSource`, so the data has been reordered into a known direction.
   
   **Stats init** (`try_init_topk_threshold`, runs before `PruningPredicate` 
build):
   
   * DESC `LIMIT K`: `threshold = max(min(col))` across RGs with `num_rows >= 
K` → filter becomes `col >= threshold` (or `col IS NULL OR col >= threshold` 
for `NULLS FIRST`).
   * ASC `LIMIT K`: `threshold = min(max(col))` across RGs with `num_rows >= K` 
→ `col <= threshold`.
   * `>=`/`<=` so the boundary value can still appear in the top-K.
   * Skips silently on: dynamic filter already updated, sort expression not a 
simple `Column`, column outside file schema, stats unavailable, type cast 
failure.
   
   **Cumulative RG prune** (inside `prepare_access_plan`, after 
`reorder_by_statistics` and `reverse`):
   
   * Accumulate `num_rows` from the front of `row_group_indexes` until the 
running total reaches `K`, then `truncate_row_groups`.
   
   Helpers added: `find_dynamic_filter`, `find_column_in_expr` (unwraps 
single-child wrappers like `CastExpr`), `compute_best_threshold`, 
`predicate_is_pure_dynamic_filter`.
   
   ### 5. SLT — Test I
   
   Six sub-tests in `sort_pushdown.slt`:
   * I.1: DESC LIMIT — EXPLAIN shows `DynamicFilter [ empty ]` + 
`sort_order_for_reorder=[id@0 DESC]` + `reverse_row_groups=true`
   * I.2: DESC LIMIT result correctness
   * I.3: ASC LIMIT in file order — `Exact` path, `SortExec` eliminated, LIMIT 
becomes a static fetch
   * I.4: DESC LIMIT **with** WHERE — both optimisations skip, result still 
correct via the regular dynamic-filter path
   * I.5: Larger LIMIT spanning multiple RGs
   * I.6: LIMIT > total rows — returns all rows
   
   ## Scope
   
   Intentionally narrow:
   
   * **No WHERE only.** Cumulative prune with WHERE needs filter selectivity 
reasoning to avoid under-returning rows; out of scope for this PR.
   * **Sort pushdown path only.** Non-sort-pushdown TopK on parquet (i.e. fall 
back through `DynamicFilterPhysicalExpr.sort_options` alone) is what #21580 
went on to enable across all TopK queries; that's a future PR on top of this 
one.
   * **Single-column leading sort** (matches the existing 
`sort_order_for_reorder` plain-`Column` constraint).
   
   ## Are these changes tested?
   
   * 125 `datafusion-datasource-parquet` lib tests pass (including 3 new tests 
for `truncate_row_groups`).
   * 1429 `datafusion-physical-expr` + `datafusion-physical-plan` lib tests 
pass.
   * `sort_pushdown` SLT passes (existing tests unchanged, new Test I added).
   * `cargo clippy --all-targets -- -D warnings` clean on the touched crates.
   
   ## Are there any user-facing changes?
   
   No. Transparent optimisation — same query results, faster TopK on sorted 
parquet when no WHERE clause is present.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to