zhuqi-lucas opened a new pull request, #21182:
URL: https://github.com/apache/datafusion/pull/21182

   ## Which issue does this PR close?
   
   Closes https://github.com/apache/datafusion/issues/17348
   
   ## Rationale for this change
   
   This PR implements the core optimization described in the EPIC [Sort 
pushdown / partially sorted 
scans](https://github.com/apache/datafusion/issues/17348): using file-level 
min/max statistics to optimize scan order and eliminate unnecessary sort 
operators.
   
   Currently, when a query has `ORDER BY`, DataFusion always inserts a 
`SortExec` even when the data is already sorted across files. This PR enables:
   1. **Sort elimination** when files are non-overlapping and internally sorted
   2. **Statistics-based file reordering** to approximate the requested order
   3. **Automatic ordering inference** from Parquet `sorting_columns` metadata 
(no `WITH ORDER` needed)
   
   ## What changes are included in this PR?
   
   ### Architecture
   
   ```text
   Query: SELECT ... ORDER BY col ASC [LIMIT N]
   
     PushdownSort optimizer
           │
           ▼
     FileScanConfig::try_pushdown_sort()
           │
           ├─► FileSource::try_pushdown_sort()
           │     │
           │     ├─ natural ordering matches? ──► Exact
           │     │   (Parquet WITH ORDER or              │
           │     │    inferred from metadata)             ▼
           │     │                           rebuild_with_source(exact=true)
           │     │                             ├─ sort files by min/max stats
           │     │                             ├─ verify non-overlapping
           │     │                             ├─ redistribute across groups
           │     │                             └─► keep output_ordering
           │     │                                  → SortExec removed
           │     │
           │     ├─ reversed ordering? ──► Inexact
           │     │   (reverse_row_groups)        │
           │     │                                ▼
           │     │                    rebuild_with_source(exact=false)
           │     │                      └─► clear output_ordering
           │     │                           → SortExec kept
           │     │
           │     └─ neither ──► Unsupported
           │
           └─► try_sort_file_groups_by_statistics()
                 (best-effort: reorder files by stats)
                 └─► Inexact if reordered
   ```
   
   ### Three Optimization Paths
   
   **Path 1: Sort Elimination (Exact)** — removes SortExec entirely
   
   When the file source's natural ordering satisfies the query (e.g., Parquet 
files with `sorting_columns` metadata), and files within each group are 
non-overlapping, the `SortExec` is completely eliminated.
   
   ```text
   Before:                              After:
     SortExec [col ASC]                   DataSourceExec [files sorted]
       DataSourceExec [files]             (output_ordering=[col ASC])
   ```
   
   **Path 2: Reverse Scan (Inexact)** — existing optimization, enhanced
   
   When the requested order is the reverse of the natural ordering, 
`reverse_row_groups=true` is set. SortExec stays but benefits from approximate 
ordering.
   
   **Path 3: Statistics-Based File Reordering** — new fallback
   
   When the FileSource returns `Unsupported`, files are reordered by their 
min/max statistics to approximate the requested order. This benefits TopK 
queries via better dynamic filter pruning.
   
   ### Multi-Partition Design
   
   For multiple execution partitions, the optimization works per-partition:
   
   ```text
   Multi-partition (each partition's SortExec eliminated):
     SortPreservingMergeExec [col ASC]        ← O(n) merge, cheap
       DataSourceExec [group 0: f1, f2]       ← no SortExec, parallel I/O
       DataSourceExec [group 1: f3, f4]       ← no SortExec, parallel I/O
   ```
   
   When bin-packing interleaves file ranges across groups, files are 
redistributed using consecutive assignment to ensure groups are ordered 
relative to each other:
   
   ```text
   Before (bin-packed, interleaved):
     Group 0: [f1(0-9),  f3(20-29)]     groups overlap!
     Group 1: [f2(10-19), f4(30-39)]
   
   After (consecutive assignment):
     Group 0: [f1(0-9),  f2(10-19)]     max=19
     Group 1: [f3(20-29), f4(30-39)]    min=20 > 19 ✓ ordered!
   ```
   
   ### Automatic Ordering Inference
   
   DataFusion already infers ordering from Parquet `sorting_columns` metadata 
(via `ordering_from_parquet_metadata`). With this PR, the inferred ordering 
flows through sort pushdown automatically — users don't need `WITH ORDER` for 
sorted Parquet files.
   
   ### Files Changed
   
   | File | Change |
   |------|--------|
   | `datasource-parquet/src/source.rs` | ParquetSource returns `Exact` when 
natural ordering satisfies request |
   | `datasource/src/file_scan_config.rs` | Core sort pushdown logic: 
statistics sorting, non-overlapping detection, multi-group redistribution |
   | `physical-optimizer/src/pushdown_sort.rs` | Module documentation update |
   | `core/tests/physical_optimizer/pushdown_sort.rs` | Updated prefix match 
test |
   | `sqllogictest/test_files/sort_pushdown.slt` | 5 new test groups (A-E) + 
updated existing tests |
   | `benchmarks/src/sort_pushdown.rs` | New benchmark for sort elimination |
   | `benchmarks/{lib,bin/dfbench,bench}.{rs,sh}` | Benchmark registration |
   
   ## Benchmark Results
   
   300k rows, 8 non-overlapping sorted parquet files, single partition:
   
   | Query | Description | Baseline (ms) | Sort Eliminated (ms) | Speedup |
   |-------|-------------|---------------|---------------------|---------|
   | Q1 | `ORDER BY col ASC` (full scan) | 159 | 91 | **43%** |
   | Q2 | `ORDER BY col ASC LIMIT 100` | 36 | 12 | **67%** |
   | Q3 | `ORDER BY col ASC` (wide, `SELECT *`) | 487 | 333 | **31%** |
   | Q4 | `ORDER BY col ASC LIMIT 100` (wide) | 119 | 30 | **74%** |
   
   LIMIT queries benefit most (67-74%) because sort elimination + limit 
pushdown means only the first few rows are read.
   
   ## Tests
   
   ### Unit Tests (12 new)
   - Unsupported/Inexact/Exact source × 
sorted/unsorted/overlapping/non-overlapping
   - Multi-group consecutive redistribution (even and uneven distribution)
   - Partial statistics, single-file groups, descending sort
   
   ### SLT Integration Tests (5 new groups)
   - **Test A**: Non-overlapping files + WITH ORDER → Sort eliminated (single 
partition)
   - **Test B**: Overlapping files → statistics reorder, SortExec retained
   - **Test C**: LIMIT queries (ASC sort elimination + DESC reverse scan)
   - **Test D**: `target_partitions=2` → SPM + per-partition sort elimination
   - **Test E**: **Inferred ordering** from Parquet metadata (no WITH ORDER) — 
single and multi partition
   
   ### Integration Tests
   - Updated prefix match test for Exact pushdown behavior
   - All 919 core integration tests pass, all existing SLT tests pass
   
   ## Test plan
   
   - [x] `cargo test -p datafusion-datasource` (111 tests pass)
   - [x] `cargo test -p datafusion-datasource-parquet` (96 tests pass)
   - [x] `cargo test -p datafusion-physical-optimizer` (27 tests pass)
   - [x] `cargo test -p datafusion --test core_integration` (919 tests pass)
   - [x] `cargo test -p datafusion` all tests (1997+ pass)
   - [x] SLT sort/order/topk tests pass
   - [x] SLT window/union/joins tests pass (no regressions)
   - [x] `cargo clippy` — 0 warnings
   - [x] Benchmark runs and shows expected speedups
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to