zhuqi-lucas commented on code in PR #21182:
URL: https://github.com/apache/datafusion/pull/21182#discussion_r3000852776
##########
benchmarks/src/bin/dfbench.rs:
##########
@@ -34,7 +34,8 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
use datafusion_benchmarks::{
- cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
+ cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown,
sort_tpch, tpcds,
Review Comment:
Thanks for the review\! Good idea — will split the benchmarks into a
follow-up PR to keep this one focused on the core optimization.
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -811,11 +819,6 @@ impl FileSource for ParquetSource {
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
})
-
- // TODO Phase 2: Add support for other optimizations:
Review Comment:
Great idea\! Row-group-level statistics reordering would be a natural
extension of our file-level reordering but at finer granularity. Especially
powerful with morselized scans where TopK could terminate after a single row
group. Will track as a follow-up.
##########
datafusion/sqllogictest/test_files/sort_pushdown.slt:
##########
@@ -1644,3 +1645,460 @@ RESET datafusion.execution.parquet.enable_page_index;
statement ok
RESET datafusion.optimizer.enable_sort_pushdown;
+
+
+###############################################################
+# Statistics-based file sorting and sort elimination tests
+###############################################################
Review Comment:
Makes sense. The SLT changes to existing tests (updated EXPLAIN outputs
showing reordered files) need to stay with the core PR since they validate the
new behavior. I will split just the benchmark code into its own PR as suggested
above.
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -896,28 +896,91 @@ impl DataSource for FileScanConfig {
}
}
+ /// Push sort requirements into file-based data sources.
+ ///
+ /// # Sort Pushdown Architecture
+ ///
+ /// ```text
Review Comment:
Thank you for taking the time to review\! Really appreciate it.
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -896,28 +896,91 @@ impl DataSource for FileScanConfig {
}
}
+ /// Push sort requirements into file-based data sources.
+ ///
+ /// # Sort Pushdown Architecture
+ ///
+ /// ```text
+ /// Query: SELECT ... ORDER BY col ASC [LIMIT N]
+ ///
+ /// PushdownSort optimizer
+ /// │
+ /// ▼
+ /// FileScanConfig::try_pushdown_sort()
+ /// │
+ /// ├─► FileSource::try_pushdown_sort()
+ /// │ │
+ /// │ ├─ natural ordering matches? ──► Exact
+ /// │ │ (e.g. Parquet WITH ORDER) │
+ /// │ │ ▼
+ /// │ │ rebuild_with_source(exact=true)
+ /// │ │ ├─ sort files by stats within
groups
+ /// │ │ ├─ verify non-overlapping
+ /// │ │ ├─ redistribute across groups
(consecutive)
+ /// │ │ └─► keep output_ordering →
SortExec removed
+ /// │ │
+ /// │ ├─ reversed ordering matches? ──► Inexact
+ /// │ │ (reverse_row_groups=true) │
+ /// │ │ ▼
+ /// │ │ rebuild_with_source(exact=false)
+ /// │ │ ├─ sort files by stats
+ /// │ │ └─► clear output_ordering →
SortExec kept
+ /// │ │
+ /// │ └─ neither ──► Unsupported
+ /// │
+ /// └─► try_sort_file_groups_by_statistics()
+ /// (best-effort: reorder files by min/max stats)
+ /// └─► Inexact if reordered, Unsupported if already in order
Review Comment:
Good catch — let me clarify. When `FileSource` returns `Unsupported`, we
fall back to `try_sort_file_groups_by_statistics()` which reorders files by
min/max stats. But if the files are **already in the correct order**
(`any_reordered = false`), we return `Unsupported` rather than `Inexact` —
because we did not actually change anything. Returning `Inexact` would make the
optimizer think we optimized the plan, but it is identical to the original.
Will improve the wording in the comment.
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1289,13 +1384,281 @@ impl FileScanConfig {
new_config.file_source = new_file_source;
- // Phase 1: Clear output_ordering for Inexact
- // (we're only reversing row groups, not guaranteeing perfect ordering)
- if !is_exact {
+ // Sort files within groups by statistics when not reversing
+ let all_non_overlapping = if !reverse_file_groups {
+ if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+ let projected_schema = new_config.projected_schema()?;
+ let projection_indices = new_config
+ .file_source
+ .projection()
+ .as_ref()
+ .and_then(|p| ordered_column_indices_from_projection(p));
+ let result = Self::sort_files_within_groups_by_statistics(
+ &new_config.file_groups,
+ &sort_order,
+ &projected_schema,
+ projection_indices.as_deref(),
+ );
+ new_config.file_groups = result.file_groups;
+ result.all_non_overlapping
+ } else {
+ false
+ }
+ } else {
+ // When reversing, files are already reversed above. We skip
+ // statistics-based sorting here because it would undo the
reversal.
+ // Note: reverse path is always Inexact, so all_non_overlapping
+ // is not used (is_exact is false).
+ false
+ };
+
+ if is_exact && all_non_overlapping {
+ // Truly exact: within-file ordering guaranteed and files are
non-overlapping.
+ //
+ // When there are multiple groups, redistribute files using
consecutive
+ // assignment so that each group remains non-overlapping AND
groups are
+ // ordered relative to each other. This enables:
+ // - No SortExec per partition (files in each group are sorted &
non-overlapping)
+ // - SPM cheaply merges ordered streams (O(n) merge)
+ // - Parallel I/O across partitions
+ //
+ // Before (bin-packing may interleave):
+ // Group 0: [file_01(1-10), file_03(21-30)] ← gap, interleaved
with group 1
+ // Group 1: [file_02(11-20), file_04(31-40)]
Review Comment:
In practice the impact is minimal: file count is typically much larger than
partition count, so the imbalance is at most 1 extra file (e.g. 51 vs 50
files). Even with some imbalance, parallel I/O across partitions still beats
single-partition sequential reads. For LIMIT queries it matters even less since
the first partition hits the limit and stops early regardless of size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]