zhuqi-lucas opened a new issue, #23036:
URL: https://github.com/apache/datafusion/issues/23036

   # [EPIC] Sort Pushdown: skip sorts and skip IO for ORDER BY / TopK queries
   
   Tracking issue for the broader **Sort Pushdown** effort in DataFusion — an
   ongoing initiative spanning **DataFusion v52 → v55+**, designed to make
   `ORDER BY` and `ORDER BY ... LIMIT N` queries on Parquet skip work
   end-to-end: skip the `SortExec`, skip row groups via min/max stats, skip
   rows via runtime `DynamicFilter` thresholds, and eventually skip IO via
   `Exact` reverse iteration.
   
   This is part of the 2026 Q3-Q4 roadmap discussion — see
   https://github.com/apache/datafusion/issues/22882.
   
   ## Motivation
   
   Parquet datasets are commonly **already sorted** (time-series files,
   partition-keyed tables, ingestion-ordered writes) or have **min/max
   statistics** that reveal where the matching rows live. Before this
   effort, DataFusion ignored all of that — every `ORDER BY` ran a full
   external sort, every `ORDER BY ... LIMIT N` read the entire table even
   when one RG would suffice.
   
   Sort Pushdown gives the planner three answers from a leaf source:
   
   - **Exact** — drop the `SortExec`, push `LIMIT N` to the source as a
     static fetch (read N rows and stop).
   - **Inexact** — keep the `SortExec`, but reorder files/RGs by stats so
     the most promising data is read first; the `TopK` heap tightens
     `DynamicFilterPhysicalExpr` fast, and downstream RGs / rows are
     pruned at runtime.
   - **Unsupported** — no help; `SortExec` does full work.
   
   ## Architecture (one picture)
   
   ```
   PushdownSort optimizer rule
       └── try_pushdown_sort  (decides Exact / Inexact / Unsupported)
               ├── leaf: ParquetSource
               └── flags: sort_order_for_reorder, reverse_row_groups, limit
   
   ParquetSource opener (runtime)
       ├── file-level reorder by min/max (cross-partition morsel queue)
       ├── RG-level reorder by min(col)  ASC
       └── iteration reverse  (for DESC vs ASC-sorted data)
   
   PushDecoderStreamState  (per-stream)
       └── at each RG boundary:
               ├── RowGroupPruner    — re-evaluate DynamicFilter vs RG stats
               ├── into_builder()    — rebuild decoder with surviving RGs
               └── (optional) toggle per-row RowFilter for fully-matched RGs
   
   SortExec / TopK  (above scan, final ordering authority)
       └── DynamicFilterPhysicalExpr  → tracker.changed() → ParquetSource
   ```
   
   ## Status legend
   
   - ✅ merged
   - 🚧 open / in review
   - 📝 design / planning
   
   ---
   
   ## Phase 1 — Framework & reverse (DataFusion v52)
   
   The optimizer-rule scaffolding, the `Exact / Inexact / Unsupported`
   classifier, and the first reverse iteration support.
   
   - ✅ #19042 — Add sorted data benchmark
   - ✅ #19064 — Establish the high level API for sort pushdown and the
     optimizer rule, support reverse files and row groups
   - ✅ #19446 — Redesign `try_reverse_output` to support more cases
   - ✅ #19557 — Fix reverse row selection to respect row-group index
   - ✅ #19368 — Minor: fix cargo fmt
   
   ## Phase 2 — Statistics-based file reorder (DataFusion v53)
   
   Files within a partition are non-overlapping and internally sorted but
   listed in the **wrong order** (alphabetical, ingestion time, etc.).
   Reorder by min/max stats, re-check non-overlap, upgrade
   `Unsupported → Exact`, eliminate `SortExec`, preserve `LIMIT`.
   
   - 📝 Issue: #17348
   - ✅ #21182 — Sort file groups by statistics during sort pushdown
     (**Phase 2**)
   - ✅ #21213 — Add sort_pushdown benchmark and SLT tests
   - ✅ #21266 — Generate reversed-name data for sort pushdown benchmark
   - ✅ #21457 — Extract sort pushdown logic from FileScanConfig into
     separate module
   - ✅ #21711 — Improve sort pushdown benchmark data and add DESC LIMIT
     queries
   
   **Benchmark (single partition, reversed file names):**
   
   | Query                | Before | After | Speedup |
   |----------------------|-------:|------:|--------:|
   | Q1 `ORDER BY`        |  259ms | 122ms |   2.1× |
   | Q2 `ORDER BY LIMIT`  |   80ms |   3ms |  **27×** |
   | Q3 `SELECT * ORDER BY` | 700ms | 313ms |   2.2× |
   | Q4 `SELECT * LIMIT`  |  342ms |   7ms |  **49×** |
   
   ## Phase 3 — Multi-partition support: BufferExec (DataFusion v54)
   
   Naïve sort elimination made multi-partition queries **slower** — SPM
   k-way merge stalls on any single IO-bound partition once `SortExec`'s
   unbounded buffer is gone. Fix: keep an explicit bounded `BufferExec` in
   SortExec's old position; one bounded queue per partition, background
   prefill.
   
   - ✅ #21426 — Make sort pushdown BufferExec capacity configurable
     (default 1 GB, via `sort_pushdown_buffer_capacity`)
   - ✅ #21674 — Add `sort_pushdown_inexact` benchmark for RG reorder
   - ✅ #21947 — Skip unnecessary plan rebuild in
     `adjust_input_keys_ordering` for non-join plans
   - ✅ #21956 — **Globally reorder files and row groups by statistics
     for TopK queries** (cross-partition morsel queue)
   - ✅ #21976 — `EnsureRequirements`: merged EnforceDistribution +
     EnforceSorting with idempotent pushdown_sorts
   - ✅ #22493 — Restore SortExec elimination after stats-based file
     reorder
   - ✅ #22501 — Cherry-pick #22493 into the v54 release branch
   
   ## Phase 4 — Runtime RG-level early stop (DataFusion v55, **in review**)
   
   Today, even with `DynamicFilter` tightening fast, DataFusion can't shut
   off the tap **mid-file** — every remaining RG in the active file gets
   fully decoded. This phase closes that gap: at every RG boundary, the
   `RowGroupPruner` re-evaluates the current dynamic threshold against each
   remaining RG's min/max; dead RGs are stripped via arrow-rs 59's
   `into_builder() → with_row_groups(remaining) → build()` API. Zero IO,
   zero decode for the skipped RGs.
   
   - 🚧 **#22450** — `feat(parquet): runtime row-group early stop via TopK
     dynamic filter` (**main PR**)
   - 🚧 #21712 — Initialize TopK dynamic filter threshold from parquet
     statistics (early threshold = first useful bound)
   - 🚧 #21580 — Statistics-driven TopK optimization (umbrella: file
     reorder + RG reorder + threshold init + cumulative prune)
   - 🚧 #22385 — TopK stats init + cumulative RG pruning for pure-TopK
     parquet scans (no-WHERE)
   - 🚧 #21828 — Pushdown `OFFSET` to parquet for RG-level skipping
   
   **Benchmark (topk_tpch suite, LIMIT 100):** 5 of 11 queries gain
   **3-4×** speedup, 0 regressions, total runtime **-44%**. Wins
   concentrate on queries whose sort key is the table's physical order
   (e.g. `l_orderkey` on `lineitem`).
   
   ## Future — `Exact` reverse path
   
   The current RG-level reverse produces "RGs DESC × rows ASC" — close
   to DESC but **not strictly DESC**, so `PushdownSort` reports `Inexact`
   and `SortExec` stays. Page-level reverse in arrow-rs would make DESC
   queries return **strictly DESC**, letting `PushdownSort` return `Exact`,
   drop the `SortExec`, and emit `LIMIT N` as a source-side static fetch.
   
   Cross-repo:
   
   - 🚧 arrow-rs#9937 — POC `ReverseSerializedPageReader` (seek + reverse
     decode state machine; tracked by arrow-rs#9934)
   - 🚧 arrow-rs#10158 — `ParquetPushDecoder::peek_next_row_group()`
     (also unblocks the per-RG `fully_matched` RowFilter-skip optimization
     inside #22450)
   
   Once arrow-rs lands these, a follow-up DataFusion PR will teach
   `try_pushdown_sort` to upgrade DESC against ASC-sorted data to `Exact`.
   
   ---
   
   ## Looking for
   
   - Reviewer cycles on #22450 (the v55 main course)
   - Co-design / review on arrow-rs#9937 and arrow-rs#10158
   - Real workloads that exercise the Inexact path (DESC queries on
     ASC-sorted parquet, TopK on high-cardinality sort keys) — happy to
     add benches
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to