zhuqi-lucas opened a new pull request, #22385: URL: https://github.com/apache/datafusion/pull/22385
## Which issue does this PR close? Carved out of [#21580](https://github.com/apache/datafusion/pull/21580) — after [#21956](https://github.com/apache/datafusion/pull/21956) merged the file/RG reorder + reverse machinery, this PR ships the two remaining pieces of the statistics-driven TopK pipeline, restricted to the simplest scope (no WHERE). ## Rationale for this change For `ORDER BY <col> LIMIT N` queries on parquet: 1. `TopK`'s dynamic filter starts as `lit(true)` — the first row groups read are never pruned by it. 2. After file/RG reorder normalises data to ASC-by-min (and optional reverse for DESC), the first few row groups already contain the rows that will end up in the heap. The rest can be pruned upfront from parquet `min/max` statistics if the threshold is known before scanning. This PR initialises the threshold from per-RG stats before the `PruningPredicate` is built, then accumulates row counts from the front of the reordered RG list and truncates once $\geq K$. The net effect on a sorted, non-overlapping dataset is that **only one row group is decoded** for a `LIMIT N` query — observed [17×–60× speedups in #21580's benchmark](https://github.com/apache/datafusion/pull/21580#issue-3585580010). ## What changes are included in this PR? Four small focused commits plus one SLT commit. ### 1. `DynamicFilterPhysicalExpr`: add `sort_options` + `fetch` New optional fields and a `new_with_sort_options(...)` constructor. Producers (TopK in `SortExec`) can describe each child's sort direction and the LIMIT $K$; consumers (parquet opener) can use them to initialise the filter threshold and to do cumulative RG pruning. No behaviour change on its own. ### 2. Wire `sort_options` + `fetch` through `SortExec::create_filter` Builds the dynamic filter via `new_with_sort_options`, passing per-child sort options and `self.fetch`. **Bug fix in `with_fetch`**: previously `create_filter()` was called before `fetch` was set, so the filter saw `fetch = None`. Restructured to create the new sort, set `fetch`, then call `create_filter`. ### 3. `PreparedAccessPlan::truncate_row_groups` Small helper that keeps the first `count` row-group indexes and drops the rest. Bails out unchanged if a `row_selection` is set (page-level pruning state would be hard to remap). Three unit tests. No call sites yet. ### 4. Opener: stats init + cumulative RG prune Both optimisations are dual-gated: * **no-WHERE** — the predicate handed to the opener is a bare `DynamicFilterPhysicalExpr` (`predicate_is_pure_dynamic_filter`). With a real `WHERE`, raw `num_rows` would over-estimate qualifying rows and risk under-returning K; the dynamic filter still tightens after a few batches but the cumulative shortcut is unsafe. * **sort pushdown** — `sort_order_for_reorder` is set on the `ParquetSource`, so the data has been reordered into a known direction. **Stats init** (`try_init_topk_threshold`, runs before `PruningPredicate` build): * DESC `LIMIT K`: `threshold = max(min(col))` across RGs with `num_rows >= K` → filter becomes `col >= threshold` (or `col IS NULL OR col >= threshold` for `NULLS FIRST`). * ASC `LIMIT K`: `threshold = min(max(col))` across RGs with `num_rows >= K` → `col <= threshold`. * `>=`/`<=` so the boundary value can still appear in the top-K. * Skips silently on: dynamic filter already updated, sort expression not a simple `Column`, column outside file schema, stats unavailable, type cast failure. **Cumulative RG prune** (inside `prepare_access_plan`, after `reorder_by_statistics` and `reverse`): * Accumulate `num_rows` from the front of `row_group_indexes` until the running total reaches `K`, then `truncate_row_groups`. Helpers added: `find_dynamic_filter`, `find_column_in_expr` (unwraps single-child wrappers like `CastExpr`), `compute_best_threshold`, `predicate_is_pure_dynamic_filter`. ### 5. SLT — Test I Six sub-tests in `sort_pushdown.slt`: * I.1: DESC LIMIT — EXPLAIN shows `DynamicFilter [ empty ]` + `sort_order_for_reorder=[id@0 DESC]` + `reverse_row_groups=true` * I.2: DESC LIMIT result correctness * I.3: ASC LIMIT in file order — `Exact` path, `SortExec` eliminated, LIMIT becomes a static fetch * I.4: DESC LIMIT **with** WHERE — both optimisations skip, result still correct via the regular dynamic-filter path * I.5: Larger LIMIT spanning multiple RGs * I.6: LIMIT > total rows — returns all rows ## Scope Intentionally narrow: * **No WHERE only.** Cumulative prune with WHERE needs filter selectivity reasoning to avoid under-returning rows; out of scope for this PR. * **Sort pushdown path only.** Non-sort-pushdown TopK on parquet (i.e. fall back through `DynamicFilterPhysicalExpr.sort_options` alone) is what #21580 went on to enable across all TopK queries; that's a future PR on top of this one. * **Single-column leading sort** (matches the existing `sort_order_for_reorder` plain-`Column` constraint). ## Are these changes tested? * 125 `datafusion-datasource-parquet` lib tests pass (including 3 new tests for `truncate_row_groups`). * 1429 `datafusion-physical-expr` + `datafusion-physical-plan` lib tests pass. * `sort_pushdown` SLT passes (existing tests unchanged, new Test I added). * `cargo clippy --all-targets -- -D warnings` clean on the touched crates. ## Are there any user-facing changes? No. Transparent optimisation — same query results, faster TopK on sorted parquet when no WHERE clause is present. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
