zhuqi-lucas opened a new pull request, #21182: URL: https://github.com/apache/datafusion/pull/21182
## Which issue does this PR close? Closes https://github.com/apache/datafusion/issues/17348 ## Rationale for this change This PR implements the core optimization described in the EPIC [Sort pushdown / partially sorted scans](https://github.com/apache/datafusion/issues/17348): using file-level min/max statistics to optimize scan order and eliminate unnecessary sort operators. Currently, when a query has `ORDER BY`, DataFusion always inserts a `SortExec` even when the data is already sorted across files. This PR enables: 1. **Sort elimination** when files are non-overlapping and internally sorted 2. **Statistics-based file reordering** to approximate the requested order 3. **Automatic ordering inference** from Parquet `sorting_columns` metadata (no `WITH ORDER` needed) ## What changes are included in this PR? ### Architecture ```text Query: SELECT ... ORDER BY col ASC [LIMIT N] PushdownSort optimizer │ ▼ FileScanConfig::try_pushdown_sort() │ ├─► FileSource::try_pushdown_sort() │ │ │ ├─ natural ordering matches? ──► Exact │ │ (Parquet WITH ORDER or │ │ │ inferred from metadata) ▼ │ │ rebuild_with_source(exact=true) │ │ ├─ sort files by min/max stats │ │ ├─ verify non-overlapping │ │ ├─ redistribute across groups │ │ └─► keep output_ordering │ │ → SortExec removed │ │ │ ├─ reversed ordering? ──► Inexact │ │ (reverse_row_groups) │ │ │ ▼ │ │ rebuild_with_source(exact=false) │ │ └─► clear output_ordering │ │ → SortExec kept │ │ │ └─ neither ──► Unsupported │ └─► try_sort_file_groups_by_statistics() (best-effort: reorder files by stats) └─► Inexact if reordered ``` ### Three Optimization Paths **Path 1: Sort Elimination (Exact)** — removes SortExec entirely When the file source's natural ordering satisfies the query (e.g., Parquet files with `sorting_columns` metadata), and files within each group are non-overlapping, the `SortExec` is completely eliminated. ```text Before: After: SortExec [col ASC] DataSourceExec [files sorted] DataSourceExec [files] (output_ordering=[col ASC]) ``` **Path 2: Reverse Scan (Inexact)** — existing optimization, enhanced When the requested order is the reverse of the natural ordering, `reverse_row_groups=true` is set. SortExec stays but benefits from approximate ordering. **Path 3: Statistics-Based File Reordering** — new fallback When the FileSource returns `Unsupported`, files are reordered by their min/max statistics to approximate the requested order. This benefits TopK queries via better dynamic filter pruning. ### Multi-Partition Design For multiple execution partitions, the optimization works per-partition: ```text Multi-partition (each partition's SortExec eliminated): SortPreservingMergeExec [col ASC] ← O(n) merge, cheap DataSourceExec [group 0: f1, f2] ← no SortExec, parallel I/O DataSourceExec [group 1: f3, f4] ← no SortExec, parallel I/O ``` When bin-packing interleaves file ranges across groups, files are redistributed using consecutive assignment to ensure groups are ordered relative to each other: ```text Before (bin-packed, interleaved): Group 0: [f1(0-9), f3(20-29)] groups overlap! Group 1: [f2(10-19), f4(30-39)] After (consecutive assignment): Group 0: [f1(0-9), f2(10-19)] max=19 Group 1: [f3(20-29), f4(30-39)] min=20 > 19 ✓ ordered! ``` ### Automatic Ordering Inference DataFusion already infers ordering from Parquet `sorting_columns` metadata (via `ordering_from_parquet_metadata`). With this PR, the inferred ordering flows through sort pushdown automatically — users don't need `WITH ORDER` for sorted Parquet files. ### Files Changed | File | Change | |------|--------| | `datasource-parquet/src/source.rs` | ParquetSource returns `Exact` when natural ordering satisfies request | | `datasource/src/file_scan_config.rs` | Core sort pushdown logic: statistics sorting, non-overlapping detection, multi-group redistribution | | `physical-optimizer/src/pushdown_sort.rs` | Module documentation update | | `core/tests/physical_optimizer/pushdown_sort.rs` | Updated prefix match test | | `sqllogictest/test_files/sort_pushdown.slt` | 5 new test groups (A-E) + updated existing tests | | `benchmarks/src/sort_pushdown.rs` | New benchmark for sort elimination | | `benchmarks/{lib,bin/dfbench,bench}.{rs,sh}` | Benchmark registration | ## Benchmark Results 300k rows, 8 non-overlapping sorted parquet files, single partition: | Query | Description | Baseline (ms) | Sort Eliminated (ms) | Speedup | |-------|-------------|---------------|---------------------|---------| | Q1 | `ORDER BY col ASC` (full scan) | 159 | 91 | **43%** | | Q2 | `ORDER BY col ASC LIMIT 100` | 36 | 12 | **67%** | | Q3 | `ORDER BY col ASC` (wide, `SELECT *`) | 487 | 333 | **31%** | | Q4 | `ORDER BY col ASC LIMIT 100` (wide) | 119 | 30 | **74%** | LIMIT queries benefit most (67-74%) because sort elimination + limit pushdown means only the first few rows are read. ## Tests ### Unit Tests (12 new) - Unsupported/Inexact/Exact source × sorted/unsorted/overlapping/non-overlapping - Multi-group consecutive redistribution (even and uneven distribution) - Partial statistics, single-file groups, descending sort ### SLT Integration Tests (5 new groups) - **Test A**: Non-overlapping files + WITH ORDER → Sort eliminated (single partition) - **Test B**: Overlapping files → statistics reorder, SortExec retained - **Test C**: LIMIT queries (ASC sort elimination + DESC reverse scan) - **Test D**: `target_partitions=2` → SPM + per-partition sort elimination - **Test E**: **Inferred ordering** from Parquet metadata (no WITH ORDER) — single and multi partition ### Integration Tests - Updated prefix match test for Exact pushdown behavior - All 919 core integration tests pass, all existing SLT tests pass ## Test plan - [x] `cargo test -p datafusion-datasource` (111 tests pass) - [x] `cargo test -p datafusion-datasource-parquet` (96 tests pass) - [x] `cargo test -p datafusion-physical-optimizer` (27 tests pass) - [x] `cargo test -p datafusion --test core_integration` (919 tests pass) - [x] `cargo test -p datafusion` all tests (1997+ pass) - [x] SLT sort/order/topk tests pass - [x] SLT window/union/joins tests pass (no regressions) - [x] `cargo clippy` — 0 warnings - [x] Benchmark runs and shows expected speedups 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
