zhuqi-lucas commented on code in PR #186:
URL: https://github.com/apache/datafusion-site/pull/186#discussion_r3522647276


##########
content/blog/2026-07-05-sort-pushdown.md:
##########
@@ -0,0 +1,839 @@
+---
+layout: post
+title: Sort Pushdown in DataFusion: Skip Sorts, Skip Decode, Skip I/O
+date: 2026-07-05
+author: Qi Zhu
+categories: [performance]
+---
+
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+[TOC]
+
+*Qi Zhu, [Massive](https://www.massive.com/)*
+
+Many [Apache Parquet] datasets are already sorted on disk. Time-series
+files are usually written in ingestion-time order. Event logs are sharded
+and sorted by event id. Partitioned tables come with a natural ordering
+implied by the partition key. The information about that ordering is
+sitting right there in the file metadata.
+
+[Apache Parquet]: https://parquet.apache.org/
+
+[Apache DataFusion] has always been able to skip the sort in the
+*trivial* case: when the catalog declares an ordering (`WITH ORDER`
+or parquet `sorting_columns`) **and** the on-disk file listing
+already matches that order, the `EnsureRequirements` rule sees that
+the scan's `output_ordering` already satisfies the request and
+**removes the redundant `SortExec`**. The hard cases were
+everything just slightly looser — files in the wrong on-disk order,
+declared but overlapping ranges, no declaration at all,
+`ORDER BY ... DESC` on ASC-sorted data — which all paid a full
+external sort across the entire scan. CPU wasted. Memory wasted.
+Streaming defeated.
+
+[Apache DataFusion]: https://datafusion.apache.org/
+
+This post walks through the **sort pushdown** series of work — a
+multi-release effort spanning DataFusion **v52 through v55+** — that
+closed that gap on three layers at once:
+
+1. **`Exact` path** — delete the `SortExec` entirely when statistics
+   prove the scan is already ordered. Headline: **2.1×–49×** on the
+   `sort_pushdown` benchmark suite.
+2. **`Inexact` path** — keep the `SortExec`, but read the
+   most-promising data first so `TopK`'s dynamic filter tightens fast
+   and stale row groups get pruned by statistics.
+3. **Runtime row-group pruning** ([#22450]) — the latest piece, which
+   re-checks the dynamic filter at every row-group boundary and
+   physically removes pruned row groups from the open `RecordBatch`
+   stream. Headline: **5 of 11** `topk_tpch` queries run **3–4× faster**
+   with zero regressions; total benchmark time drops by **−44%**.
+
+[#22450]: https://github.com/apache/datafusion/pull/22450
+[#20839]: https://github.com/apache/datafusion/pull/20839
+
+Together these compose into a **three-layer pruning stack** —
+file-level, row-group-level, row-level — all driven by the same
+`TopK` dynamic filter. **None of the runtime parts of this post
+would be possible without TopK's dynamic filter pushdown** — the
+primitive that lets `TopK`'s threshold reach the parquet scanner
+mid-execution. The earlier [Dynamic Filters][dyn-filters-blog]
+post covers that primitive in detail and is recommended background
+for what follows.
+
+The page-level reverse primitive we are adding upstream in
+[arrow-rs] will push the `DESC` gains further still.
+
+[arrow-rs]: https://github.com/apache/arrow-rs
+
+## TL;DR
+
+* DataFusion can now **eliminate `SortExec` in far more cases** —
+  even when files are listed in the wrong order on disk — **read the
+  most-promising data first** when sorts can't be eliminated, and
+  **drop row groups mid-scan** as `TopK`'s heap converges.
+* What landed across v52–v55+:
+  * **The `PushdownSort` rule** — a physical optimizer rule that
+    asks each `ExecutionPlan` "can you produce output in *this*
+    ordering?" and uses the `Exact` / `Inexact` / `Unsupported`
+    answer to decide whether to delete the surrounding `SortExec`,
+    leave it in place with a hint, or give up.
+  * **Sort elimination via statistics (`Exact` path)** —
+    `PushdownSort` sorts files within each partition by Parquet
+    `min/max` statistics and, when the resulting ranges are provably
+    non-overlapping, upgrades the source's ordering claim from
+    `Unsupported` to `Exact` and **removes the `SortExec`** that
+    earlier passes left in the plan. `BufferExec` keeps multi-partition
+    plans from regressing when the eliminated `SortExec` was also
+    serving as an implicit buffer.
+  * **Runtime reorder (`Inexact` path)** — when the leading sort key
+    is a plain column (or the reversed source ordering satisfies the
+    request), the scan reorders files and row groups by `min/max`
+    stats so the most-promising data is read first; for `DESC`
+    requests it additionally flips iteration. The verdict is
+    `Inexact`, so `SortExec` stays in place, but `TopK`'s dynamic
+    filter tightens fast and the rest is pruned. File-level early
+    stop has been working for a while; **row-group-level early stop
+    was the missing piece until [#22450]**.
+  * **Runtime row-group dynamic pruning ([#22450])** — inside the
+    decoder loop, at every row-group boundary, the live `TopK`
+    threshold is converted into a fresh `PruningPredicate` and used
+    to drop subsequent row groups before any bytes are fetched.
+    A single row group can fill `TopK`'s heap, snap the threshold,
+    and cascade-prune every remaining row group in one pass — no
+    I/O, no decompress, no decode, not even the filter column.
+* Benchmarks:
+  * **`Exact` path on `sort_pushdown`**: `ORDER BY ... LIMIT` runs
+    **27× and 49× faster**; full `ORDER BY` scans run **~2×** faster.
+  * **[#22450] on `topk_tpch`** (TPC-H SF1, all `ORDER BY ... LIMIT 100`):
+    5 of 11 queries get **3–4× faster**, **0 regressions**, total
+    runtime drops **−44%** (248.8 ms → 139.1 ms).
+
+## Why Sort Pushdown Matters
+
+`SortExec` is one of the most expensive operators in a query plan.
+It is blocking by construction — no row can leave until every input
+row has been seen and compared — so it tends to dominate both latency
+and peak memory. The cost gets paid even when:
+
+* the file is already ordered by the sort key (very common for
+  timestamp columns);
+* the query only needs the top *N* rows (`ORDER BY ts LIMIT 100`), in
+  which case full sort + truncate is wildly wasteful;
+* the next operator (`SortPreservingMergeExec`, `SortMergeJoinExec`,
+  a window function) was going to consume ordered input anyway.
+
+The data DataFusion needs to avoid this work is **already in the file
+metadata**. Parquet writers can record per-column statistics (`min`,
+`max`) at the row-group level. Files written by Spark, DuckDB,
+arrow-rs, and others routinely include them. And explicit `WITH ORDER`
+clauses in DataFusion's SQL `CREATE EXTERNAL TABLE` give the optimizer
+a direct ordering hint. The job of sort pushdown is to **use that
+information** — at plan time when possible, and at runtime when only
+the live state of the query can tell us what's still worth reading.
+
+## How DataFusion Tracks Ordering
+
+<img src="/blog/images/sort-pushdown/plan-diff.svg" alt="EXPLAIN before / 
after: SortExec eliminated once ordering is Exact" width="100%" 
class="img-fluid"/>
+
+Each `FileScanConfig` carries an `output_ordering` — the ordering
+that the optimizer is willing to claim for the scan's output. There
+are two flavours:
+
+* **`Exact`** — the optimizer is *certain* the output is in this order.
+  Optimizer rules treat an `Exact` ordering as a proof and **remove
+  the surrounding `SortExec`** entirely. [`EnsureRequirements`] does
+  this when the scan already declared `Exact` from the start; the
+  [`PushdownSort`] rule covered in this post does the same after
+  upgrading from `Unsupported` via stats-based reorder.
+* **`Inexact`** — the optimizer *believes* the output is probably
+  ordered, but cannot prove it. Downstream operators like
+  `SortPreservingMergeExec` can still benefit from this hint, but the
+  explicit `SortExec` stays for safety.
+
+[`PushdownSort`]: 
https://github.com/apache/datafusion/blob/main/datafusion/physical-optimizer/src/pushdown_sort.rs
+
+A helper called `validated_output_ordering()` is the gatekeeper. It
+walks the list of files inside a partition, checks whether the
+declared per-file ordering is consistent with the file order on disk,
+and either confirms the ordering or **strips it entirely** if it
+sees something ambiguous (e.g. file `b` comes before file `a` in the
+file list but file `a`'s range comes first).
+
+### `Exact` and `Inexact` at runtime
+
+`Exact` and `Inexact` lead to different runtime behaviour, and
+distinguishing them up front makes the rest of this post easier to
+follow:
+
+* With **`Exact`**, the `SortExec` is removed and the LIMIT becomes
+  a **static fetch** on the source. The reader stops the moment the
+  requested number of rows has been emitted — early termination
+  at batch granularity, no dynamic state needed.
+* With **`Inexact`**, the `SortExec` stays in place. The LIMIT
+  materialises inside the sort as a `TopK` heap of size K. `TopK`
+  exposes a [**dynamic filter**][dyn-filters-blog] — a runtime
+  expression of the form *"only rows that could still beat the
+  current K-th-best value are worth considering"* — and pushes it
+  back to the parquet scanner. As more data is processed and the
+  heap tightens, the filter's threshold tightens with it, and entire
+  row groups can be skipped by checking the live threshold against
+  the row group's min/max statistics. (See the earlier
+  [dynamic filters][dyn-filters-blog] post for the full background
+  on this mechanism.)
+
+Both paths use the same underlying min/max statistics, but for
+different purposes: `Exact` uses them at plan time to prove
+non-overlap and justify removing the sort; `Inexact` uses them at
+runtime to skip data that can no longer improve the heap.
+
+[dyn-filters-blog]: 
https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/
+
+## The `PushdownSort` Rule
+
+The **`PushdownSort`** physical optimizer rule asks each
+`ExecutionPlan` two questions:
+
+1. "Can you produce output in *this* ordering?"
+2. "If yes, please rearrange yourself so that it actually does."
+
+The answer is one of `Exact`, `Inexact`, `Unsupported`. `Exact`
+means the surrounding `SortExec` can be deleted entirely; `Inexact`
+means the source will read the data in a near-sorted order so
+`TopK` and other consumers benefit, but `SortExec` stays for
+strict correctness; `Unsupported` means the source can't help and
+`SortExec` does the full job. The rest of this post is what each
+merged capability does on top of this protocol — first the `Exact`
+path, then the `Inexact` path, then the runtime row-group prune
+that `Inexact` was missing.
+
+## The `Exact` Path · Sort Elimination via Statistics
+
+<img src="/blog/images/sort-pushdown/phase1-file-reorder.svg" alt="Sort 
elimination: rearranging files within a partition by min/max statistics so the 
file list is in range order" width="100%" class="img-fluid"/>
+
+A note on what's actually new here: the *trivial* sort-elimination
+case (declared ordering + matching on-disk file list) has always
+worked via the `EnsureRequirements` rule. What was missing was
+sort elimination when the file list was in the **wrong order** on
+disk. `PushdownSort` landed across two phases — [Phase 1
+(#19064)][#19064] introduced the rule scaffolding and reverse
+iteration (the latter is used by the `Inexact` path covered
+later); [Phase 2 (#21182)][#21182] added stats-based file reorder
+so the rule can also produce `Exact` for the wrong-disk-order case.
+The motivating scenario for Phase 2:
+
+[#19064]: https://github.com/apache/datafusion/pull/19064
+[#21182]: https://github.com/apache/datafusion/pull/21182
+
+* Three files: `a.parquet`, `b.parquet`, `c.parquet`.
+* Each declares `WITH ORDER (ts ASC)`.
+* Internally each file *is* sorted by `ts`.
+* But they were written by different ingestion jobs and end up listed
+  in the **wrong order** on disk (e.g. alphabetical by name, not by
+  time).
+
+`validated_output_ordering()` looks at this, sees that the
+file-internal ordering disagrees with the file-list order, and
+**strips the ordering entirely**. From the optimizer's point of view
+the scan now has no declared ordering. Physical planning translates
+the user's `ORDER BY` into a `SortExec`, and the earlier pipeline
+rule [`EnsureRequirements`] (which consolidates the old

Review Comment:
   Fixed — all backtick-in-link patterns rewritten as inline `[X](url)` (10 
total after a follow-up sweep). Verified in the rendered HTML: 0 literal 
`[X][ref]` remaining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to