zhuqi-lucas commented on code in PR #21182:
URL: https://github.com/apache/datafusion/pull/21182#discussion_r3000852776


##########
benchmarks/src/bin/dfbench.rs:
##########
@@ -34,7 +34,8 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use datafusion_benchmarks::{
-    cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
+    cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown, 
sort_tpch, tpcds,

Review Comment:
   Thanks for the review\! Good idea — will split the benchmarks into a 
follow-up PR to keep this one focused on the core optimization.



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -811,11 +819,6 @@ impl FileSource for ParquetSource {
         Ok(SortOrderPushdownResult::Inexact {
             inner: Arc::new(new_source) as Arc<dyn FileSource>,
         })
-
-        // TODO Phase 2: Add support for other optimizations:

Review Comment:
   Great idea\! Row-group-level statistics reordering would be a natural 
extension of our file-level reordering but at finer granularity. Especially 
powerful with morselized scans where TopK could terminate after a single row 
group. Will track as a follow-up.



##########
datafusion/sqllogictest/test_files/sort_pushdown.slt:
##########
@@ -1644,3 +1645,460 @@ RESET datafusion.execution.parquet.enable_page_index;
 
 statement ok
 RESET datafusion.optimizer.enable_sort_pushdown;
+
+
+###############################################################
+# Statistics-based file sorting and sort elimination tests
+###############################################################

Review Comment:
   Makes sense. The SLT changes to existing tests (updated EXPLAIN outputs 
showing reordered files) need to stay with the core PR since they validate the 
new behavior. I will split just the benchmark code into its own PR as suggested 
above.



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -896,28 +896,91 @@ impl DataSource for FileScanConfig {
         }
     }
 
+    /// Push sort requirements into file-based data sources.
+    ///
+    /// # Sort Pushdown Architecture
+    ///
+    /// ```text

Review Comment:
   Thank you for taking the time to review\! Really appreciate it.



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -896,28 +896,91 @@ impl DataSource for FileScanConfig {
         }
     }
 
+    /// Push sort requirements into file-based data sources.
+    ///
+    /// # Sort Pushdown Architecture
+    ///
+    /// ```text
+    /// Query: SELECT ... ORDER BY col ASC [LIMIT N]
+    ///
+    ///   PushdownSort optimizer
+    ///         │
+    ///         ▼
+    ///   FileScanConfig::try_pushdown_sort()
+    ///         │
+    ///         ├─► FileSource::try_pushdown_sort()
+    ///         │     │
+    ///         │     ├─ natural ordering matches? ──► Exact
+    ///         │     │   (e.g. Parquet WITH ORDER)     │
+    ///         │     │                                  ▼
+    ///         │     │                      rebuild_with_source(exact=true)
+    ///         │     │                        ├─ sort files by stats within 
groups
+    ///         │     │                        ├─ verify non-overlapping
+    ///         │     │                        ├─ redistribute across groups 
(consecutive)
+    ///         │     │                        └─► keep output_ordering → 
SortExec removed
+    ///         │     │
+    ///         │     ├─ reversed ordering matches? ──► Inexact
+    ///         │     │   (reverse_row_groups=true)      │
+    ///         │     │                                   ▼
+    ///         │     │                       rebuild_with_source(exact=false)
+    ///         │     │                         ├─ sort files by stats
+    ///         │     │                         └─► clear output_ordering → 
SortExec kept
+    ///         │     │
+    ///         │     └─ neither ──► Unsupported
+    ///         │
+    ///         └─► try_sort_file_groups_by_statistics()
+    ///               (best-effort: reorder files by min/max stats)
+    ///               └─► Inexact if reordered, Unsupported if already in order

Review Comment:
   Good catch — let me clarify. When `FileSource` returns `Unsupported`, we 
fall back to `try_sort_file_groups_by_statistics()` which reorders files by 
min/max stats. But if the files are **already in the correct order** 
(`any_reordered = false`), we return `Unsupported` rather than `Inexact` — 
because we did not actually change anything. Returning `Inexact` would make the 
optimizer think we optimized the plan, but it is identical to the original. 
Will improve the wording in the comment.



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1289,13 +1384,281 @@ impl FileScanConfig {
 
         new_config.file_source = new_file_source;
 
-        // Phase 1: Clear output_ordering for Inexact
-        // (we're only reversing row groups, not guaranteeing perfect ordering)
-        if !is_exact {
+        // Sort files within groups by statistics when not reversing
+        let all_non_overlapping = if !reverse_file_groups {
+            if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let result = Self::sort_files_within_groups_by_statistics(
+                    &new_config.file_groups,
+                    &sort_order,
+                    &projected_schema,
+                    projection_indices.as_deref(),
+                );
+                new_config.file_groups = result.file_groups;
+                result.all_non_overlapping
+            } else {
+                false
+            }
+        } else {
+            // When reversing, files are already reversed above. We skip
+            // statistics-based sorting here because it would undo the 
reversal.
+            // Note: reverse path is always Inexact, so all_non_overlapping
+            // is not used (is_exact is false).
+            false
+        };
+
+        if is_exact && all_non_overlapping {
+            // Truly exact: within-file ordering guaranteed and files are 
non-overlapping.
+            //
+            // When there are multiple groups, redistribute files using 
consecutive
+            // assignment so that each group remains non-overlapping AND 
groups are
+            // ordered relative to each other. This enables:
+            // - No SortExec per partition (files in each group are sorted & 
non-overlapping)
+            // - SPM cheaply merges ordered streams (O(n) merge)
+            // - Parallel I/O across partitions
+            //
+            // Before (bin-packing may interleave):
+            //   Group 0: [file_01(1-10), file_03(21-30)]  ← gap, interleaved 
with group 1
+            //   Group 1: [file_02(11-20), file_04(31-40)]

Review Comment:
   In practice the impact is minimal: file count is typically much larger than 
partition count, so the imbalance is at most 1 extra file (e.g. 51 vs 50 
files). Even with some imbalance, parallel I/O across partitions still beats 
single-partition sequential reads. For LIMIT queries it matters even less since 
the first partition hits the limit and stops early regardless of size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to