adriangb commented on code in PR #21182:
URL: https://github.com/apache/datafusion/pull/21182#discussion_r3000089388


##########
benchmarks/src/bin/dfbench.rs:
##########
@@ -34,7 +34,8 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use datafusion_benchmarks::{
-    cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
+    cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown, 
sort_tpch, tpcds,

Review Comment:
   Adding `sort_pushdown`. To keep the PR smaller and so we can run comparison 
benchmarks, could you split the benchmarks out into their own PR?



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -811,11 +819,6 @@ impl FileSource for ParquetSource {
         Ok(SortOrderPushdownResult::Inexact {
             inner: Arc::new(new_source) as Arc<dyn FileSource>,
         })
-
-        // TODO Phase 2: Add support for other optimizations:

Review Comment:
   I do think there's one more trick we could have up our sleeves: instead of 
only reversing row group orders we could pass the desired sort order into the 
opener and have it re-sort the row groups based on stats to try to match the 
scan's desired ordering. This might be especially effective once we have 
morselized scans since we could terminate after a single row group for TopK 
queries.



##########
datafusion/sqllogictest/test_files/sort_pushdown.slt:
##########
@@ -1644,3 +1645,460 @@ RESET datafusion.execution.parquet.enable_page_index;
 
 statement ok
 RESET datafusion.optimizer.enable_sort_pushdown;
+
+
+###############################################################
+# Statistics-based file sorting and sort elimination tests
+###############################################################

Review Comment:
   We could also commit these alongside the benchmarks so we can then look at 
just the diff.



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -896,28 +896,91 @@ impl DataSource for FileScanConfig {
         }
     }
 
+    /// Push sort requirements into file-based data sources.
+    ///
+    /// # Sort Pushdown Architecture
+    ///
+    /// ```text

Review Comment:
   This diagram is amazing, thank you so much for the detailed docs!



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -896,28 +896,91 @@ impl DataSource for FileScanConfig {
         }
     }
 
+    /// Push sort requirements into file-based data sources.
+    ///
+    /// # Sort Pushdown Architecture
+    ///
+    /// ```text
+    /// Query: SELECT ... ORDER BY col ASC [LIMIT N]
+    ///
+    ///   PushdownSort optimizer
+    ///         │
+    ///         ▼
+    ///   FileScanConfig::try_pushdown_sort()
+    ///         │
+    ///         ├─► FileSource::try_pushdown_sort()
+    ///         │     │
+    ///         │     ├─ natural ordering matches? ──► Exact
+    ///         │     │   (e.g. Parquet WITH ORDER)     │
+    ///         │     │                                  ▼
+    ///         │     │                      rebuild_with_source(exact=true)
+    ///         │     │                        ├─ sort files by stats within 
groups
+    ///         │     │                        ├─ verify non-overlapping
+    ///         │     │                        ├─ redistribute across groups 
(consecutive)
+    ///         │     │                        └─► keep output_ordering → 
SortExec removed
+    ///         │     │
+    ///         │     ├─ reversed ordering matches? ──► Inexact
+    ///         │     │   (reverse_row_groups=true)      │
+    ///         │     │                                   ▼
+    ///         │     │                       rebuild_with_source(exact=false)
+    ///         │     │                         ├─ sort files by stats
+    ///         │     │                         └─► clear output_ordering → 
SortExec kept
+    ///         │     │
+    ///         │     └─ neither ──► Unsupported
+    ///         │
+    ///         └─► try_sort_file_groups_by_statistics()
+    ///               (best-effort: reorder files by min/max stats)
+    ///               └─► Inexact if reordered, Unsupported if already in order

Review Comment:
   > Unsupported if already in order
   
   I didn't understand this part



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1289,13 +1384,281 @@ impl FileScanConfig {
 
         new_config.file_source = new_file_source;
 
-        // Phase 1: Clear output_ordering for Inexact
-        // (we're only reversing row groups, not guaranteeing perfect ordering)
-        if !is_exact {
+        // Sort files within groups by statistics when not reversing
+        let all_non_overlapping = if !reverse_file_groups {
+            if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let result = Self::sort_files_within_groups_by_statistics(
+                    &new_config.file_groups,
+                    &sort_order,
+                    &projected_schema,
+                    projection_indices.as_deref(),
+                );
+                new_config.file_groups = result.file_groups;
+                result.all_non_overlapping
+            } else {
+                false
+            }
+        } else {
+            // When reversing, files are already reversed above. We skip
+            // statistics-based sorting here because it would undo the 
reversal.
+            // Note: reverse path is always Inexact, so all_non_overlapping
+            // is not used (is_exact is false).
+            false
+        };
+
+        if is_exact && all_non_overlapping {
+            // Truly exact: within-file ordering guaranteed and files are 
non-overlapping.
+            //
+            // When there are multiple groups, redistribute files using 
consecutive
+            // assignment so that each group remains non-overlapping AND 
groups are
+            // ordered relative to each other. This enables:
+            // - No SortExec per partition (files in each group are sorted & 
non-overlapping)
+            // - SPM cheaply merges ordered streams (O(n) merge)
+            // - Parallel I/O across partitions
+            //
+            // Before (bin-packing may interleave):
+            //   Group 0: [file_01(1-10), file_03(21-30)]  ← gap, interleaved 
with group 1
+            //   Group 1: [file_02(11-20), file_04(31-40)]

Review Comment:
   Are there scenarios where ending up with lopsided partitions negates the 
benefits?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to