alamb commented on code in PR #21182:
URL: https://github.com/apache/datafusion/pull/21182#discussion_r3039838330


##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1293,13 +1361,273 @@ impl FileScanConfig {
 
         new_config.file_source = new_file_source;
 
-        // Phase 1: Clear output_ordering for Inexact
-        // (we're only reversing row groups, not guaranteeing perfect ordering)
-        if !is_exact {
+        // Sort files within groups by statistics when not reversing
+        let all_non_overlapping = if !reverse_file_groups {
+            if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let result = Self::sort_files_within_groups_by_statistics(
+                    &new_config.file_groups,
+                    &sort_order,
+                    &projected_schema,
+                    projection_indices.as_deref(),
+                );
+                new_config.file_groups = result.file_groups;
+                result.all_non_overlapping
+            } else {
+                false
+            }
+        } else {
+            // When reversing, files are already reversed above. We skip
+            // statistics-based sorting here because it would undo the 
reversal.
+            // Note: reverse path is always Inexact, so all_non_overlapping
+            // is not used (is_exact is false).
+            false
+        };
+
+        if is_exact && all_non_overlapping {
+            // Truly exact: within-file ordering guaranteed and files are 
non-overlapping.
+            // Keep output_ordering so SortExec can be eliminated for each 
partition.
+            //
+            // We intentionally do NOT redistribute files across groups here.
+            // The planning-phase bin-packing may interleave file ranges 
across groups:
+            //
+            //   Group 0: [f1(1-10), f3(21-30)]   ← interleaved with group 1
+            //   Group 1: [f2(11-20), f4(31-40)]
+            //
+            // This interleaving is actually beneficial because SPM pulls from 
both
+            // partitions concurrently, keeping parallel I/O active:
+            //
+            //   SPM: pull P0 [1-10] → pull P1 [11-20] → pull P0 [21-30] → 
pull P1 [31-40]
+            //        ^^^^^^^^^^^^     ^^^^^^^^^^^^
+            //        both partitions scanning files simultaneously
+            //
+            // If we were to redistribute files consecutively:
+            //   Group 0: [f1(1-10), f2(11-20)]   ← all values < group 1
+            //   Group 1: [f3(21-30), f4(31-40)]
+            //
+            // SPM would read ALL of group 0 first (values always smaller), 
then group 1.
+            // This degrades to single-threaded sequential I/O — the other 
partition
+            // sits idle the entire time, losing the parallelism benefit.
+        } else {
             new_config.output_ordering = vec![];
         }
 
-        Ok(Arc::new(new_config))
+        Ok(new_config)
+    }
+
+    /// Sort files within each file group by their min/max statistics.
+    ///
+    /// No files are moved between groups — parallelism and group composition
+    /// are unchanged. Groups where statistics are unavailable are kept as-is.
+    ///
+    /// ```text
+    /// Before:  Group [file_c(20-30), file_a(0-9), file_b(10-19)]
+    /// After:   Group [file_a(0-9), file_b(10-19), file_c(20-30)]
+    ///                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+    ///                 sorted by min value, non-overlapping → Exact
+    /// ```
+    fn sort_files_within_groups_by_statistics(
+        file_groups: &[FileGroup],
+        sort_order: &LexOrdering,
+        projected_schema: &SchemaRef,
+        projection_indices: Option<&[usize]>,
+    ) -> SortedFileGroups {
+        let mut any_reordered = false;
+        let mut confirmed_non_overlapping: usize = 0;
+        let mut new_groups = Vec::with_capacity(file_groups.len());
+
+        for group in file_groups {
+            if group.len() <= 1 {
+                new_groups.push(group.clone());
+                confirmed_non_overlapping += 1;
+                continue;
+            }
+
+            let files: Vec<_> = group.iter().collect();
+
+            let statistics = match MinMaxStatistics::new_from_files(
+                sort_order,
+                projected_schema,
+                projection_indices,
+                files.iter().copied(),
+            ) {
+                Ok(stats) => stats,
+                Err(e) => {
+                    log::trace!(
+                        "Cannot sort file group by statistics: {e}. Keeping 
original order."
+                    );
+                    new_groups.push(group.clone());
+                    continue;
+                }
+            };
+
+            let sorted_indices = statistics.min_values_sorted();
+
+            let already_sorted = sorted_indices
+                .iter()
+                .enumerate()
+                .all(|(pos, (idx, _))| pos == *idx);
+
+            let sorted_group: FileGroup = if already_sorted {
+                group.clone()
+            } else {
+                any_reordered = true;
+                sorted_indices
+                    .iter()
+                    .map(|(idx, _)| files[*idx].clone())
+                    .collect()
+            };
+
+            let sorted_files: Vec<_> = sorted_group.iter().collect();
+            let is_non_overlapping = match MinMaxStatistics::new_from_files(
+                sort_order,
+                projected_schema,
+                projection_indices,
+                sorted_files.iter().copied(),
+            ) {
+                Ok(stats) => stats.is_sorted(),
+                Err(_) => false,
+            };
+
+            if is_non_overlapping {
+                confirmed_non_overlapping += 1;
+            }
+
+            new_groups.push(sorted_group);
+        }
+
+        SortedFileGroups {

Review Comment:
   As a follow on PR it might be nice to figure out how to move some of this 
code out of FileScanConfig and into some other smaller module (no need to do it 
in this PR, I am just observing that the FileScanConfig is getting large)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to