Copilot commented on code in PR #21182:
URL: https://github.com/apache/datafusion/pull/21182#discussion_r2996048138


##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1289,13 +1384,294 @@ impl FileScanConfig {
 
         new_config.file_source = new_file_source;
 
-        // Phase 1: Clear output_ordering for Inexact
-        // (we're only reversing row groups, not guaranteeing perfect ordering)
-        if !is_exact {
+        // Sort files within groups by statistics when not reversing
+        let all_non_overlapping = if !reverse_file_groups {
+            if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let result = Self::sort_files_within_groups_by_statistics(
+                    &new_config.file_groups,
+                    &sort_order,
+                    &projected_schema,
+                    projection_indices.as_deref(),
+                );
+                new_config.file_groups = result.file_groups;
+                result.all_non_overlapping
+            } else {
+                false
+            }
+        } else {
+            // When reversing, check if reversed groups are non-overlapping
+            if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let result = Self::sort_files_within_groups_by_statistics(
+                    &new_config.file_groups,
+                    &sort_order,
+                    &projected_schema,
+                    projection_indices.as_deref(),
+                );
+                result.all_non_overlapping
+            } else {
+                false
+            }

Review Comment:
   In the `reverse_file_groups` branch, 
`sort_files_within_groups_by_statistics(...)` is executed but 
`result.file_groups` is never applied to `new_config.file_groups`. This adds 
avoidable stats computation and also prevents stats-based ordering from helping 
reverse-scan cases (the function’s doc/architecture comment suggests 
stats-based ordering should still apply). Either (a) apply 
`new_config.file_groups = result.file_groups` when reversing is allowed to 
reorder files, or (b) avoid calling `sort_files_within_groups_by_statistics` 
here and compute only the specific non-overlap predicate you actually need.



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1289,13 +1384,294 @@ impl FileScanConfig {
 
         new_config.file_source = new_file_source;
 
-        // Phase 1: Clear output_ordering for Inexact
-        // (we're only reversing row groups, not guaranteeing perfect ordering)
-        if !is_exact {
+        // Sort files within groups by statistics when not reversing
+        let all_non_overlapping = if !reverse_file_groups {
+            if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let result = Self::sort_files_within_groups_by_statistics(
+                    &new_config.file_groups,
+                    &sort_order,
+                    &projected_schema,
+                    projection_indices.as_deref(),
+                );
+                new_config.file_groups = result.file_groups;
+                result.all_non_overlapping
+            } else {
+                false
+            }
+        } else {
+            // When reversing, check if reversed groups are non-overlapping
+            if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let result = Self::sort_files_within_groups_by_statistics(
+                    &new_config.file_groups,
+                    &sort_order,
+                    &projected_schema,
+                    projection_indices.as_deref(),
+                );
+                result.all_non_overlapping
+            } else {
+                false
+            }
+        };
+
+        if is_exact && all_non_overlapping {
+            // Truly exact: within-file ordering guaranteed and files are 
non-overlapping.
+            //
+            // When there are multiple groups, redistribute files using 
consecutive
+            // assignment so that each group remains non-overlapping AND 
groups are
+            // ordered relative to each other. This enables:
+            // - No SortExec per partition (files in each group are sorted & 
non-overlapping)
+            // - SPM cheaply merges ordered streams (O(n) merge)
+            // - Parallel I/O across partitions
+            //
+            // Before (bin-packing may interleave):
+            //   Group 0: [file_01(1-10), file_03(21-30)]  ← gap, interleaved 
with group 1
+            //   Group 1: [file_02(11-20), file_04(31-40)]
+            //
+            // After (consecutive assignment):
+            //   Group 0: [file_01(1-10), file_02(11-20)]  ← consecutive, 
ordered
+            //   Group 1: [file_03(21-30), file_04(31-40)]  ← consecutive, 
ordered
+            if new_config.file_groups.len() > 1
+                && let Some(sort_order) = 
LexOrdering::new(order.iter().cloned())
+            {
+                let projected_schema = new_config.projected_schema()?;
+                let projection_indices = new_config
+                    .file_source
+                    .projection()
+                    .as_ref()
+                    .and_then(|p| ordered_column_indices_from_projection(p));
+                let num_groups = new_config.file_groups.len();
+                new_config.file_groups =
+                    Self::redistribute_files_across_groups_by_statistics(
+                        &new_config.file_groups,
+                        &sort_order,
+                        &projected_schema,
+                        projection_indices.as_deref(),
+                        num_groups,
+                    );
+            }
+        } else {
             new_config.output_ordering = vec![];
         }
 
-        Ok(Arc::new(new_config))
+        Ok(new_config)
+    }
+
+    /// Sort files within each file group by their min/max statistics.
+    ///
+    /// No files are moved between groups — parallelism and group composition
+    /// are unchanged. Groups where statistics are unavailable are kept as-is.
+    ///
+    /// ```text
+    /// Before:  Group [file_c(20-30), file_a(0-9), file_b(10-19)]
+    /// After:   Group [file_a(0-9), file_b(10-19), file_c(20-30)]
+    ///                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+    ///                 sorted by min value, non-overlapping → Exact
+    /// ```
+    fn sort_files_within_groups_by_statistics(
+        file_groups: &[FileGroup],
+        sort_order: &LexOrdering,
+        projected_schema: &SchemaRef,
+        projection_indices: Option<&[usize]>,
+    ) -> SortedFileGroups {
+        let mut any_reordered = false;
+        let mut confirmed_non_overlapping: usize = 0;
+        let mut new_groups = Vec::with_capacity(file_groups.len());
+
+        for group in file_groups {
+            if group.len() <= 1 {
+                new_groups.push(group.clone());
+                confirmed_non_overlapping += 1;
+                continue;
+            }
+
+            let files: Vec<_> = group.iter().collect();
+
+            let statistics = match MinMaxStatistics::new_from_files(
+                sort_order,
+                projected_schema,
+                projection_indices,
+                files.iter().copied(),
+            ) {
+                Ok(stats) => stats,
+                Err(e) => {
+                    log::trace!(
+                        "Cannot sort file group by statistics: {e}. Keeping 
original order."
+                    );
+                    new_groups.push(group.clone());
+                    continue;
+                }
+            };
+
+            let sorted_indices = statistics.min_values_sorted();
+
+            let already_sorted = sorted_indices
+                .iter()
+                .enumerate()
+                .all(|(pos, (idx, _))| pos == *idx);
+
+            let sorted_group: FileGroup = if already_sorted {
+                group.clone()
+            } else {
+                any_reordered = true;
+                sorted_indices
+                    .iter()
+                    .map(|(idx, _)| files[*idx].clone())
+                    .collect()
+            };
+
+            let sorted_files: Vec<_> = sorted_group.iter().collect();
+            let is_non_overlapping = match MinMaxStatistics::new_from_files(
+                sort_order,
+                projected_schema,
+                projection_indices,
+                sorted_files.iter().copied(),
+            ) {
+                Ok(stats) => stats.is_sorted(),
+                Err(_) => false,
+            };
+
+            if is_non_overlapping {
+                confirmed_non_overlapping += 1;
+            }
+
+            new_groups.push(sorted_group);
+        }
+
+        SortedFileGroups {
+            file_groups: new_groups,
+            any_reordered,
+            all_non_overlapping: confirmed_non_overlapping == 
file_groups.len(),
+        }
+    }
+
+    /// Redistribute files across groups using consecutive assignment.
+    ///
+    /// `split_groups_by_statistics` uses bin-packing which balances group 
sizes
+    /// but can interleave file ranges. This method fixes that by assigning
+    /// consecutive sorted files to consecutive groups:
+    ///
+    /// ```text
+    /// Input (bin-packed, interleaved):
+    ///   Group 0: [f1(0-9),  f3(20-29)]     max(f1)=9  but f3=20 > 
Group1.f2=10
+    ///   Group 1: [f2(10-19), f4(30-39)]     groups overlap!
+    ///
+    /// After global sort + consecutive assignment:
+    ///   Group 0: [f1(0-9),  f2(10-19)]     max=19
+    ///   Group 1: [f3(20-29), f4(30-39)]    min=20 > 19 ✓ groups are ordered!
+    ///
+    /// Resulting plan:
+    ///   SPM [col ASC]                       ← O(n) merge, reads group 0 then 
group 1
+    ///     DataSourceExec [f1, f2]           ← parallel I/O, no SortExec
+    ///     DataSourceExec [f3, f4]           ← parallel I/O, no SortExec
+    /// ```
+    ///
+    /// Falls back to the original groups if statistics are unavailable.
+    fn redistribute_files_across_groups_by_statistics(
+        file_groups: &[FileGroup],
+        sort_order: &LexOrdering,
+        projected_schema: &SchemaRef,
+        projection_indices: Option<&[usize]>,
+        num_groups: usize,
+    ) -> Vec<FileGroup> {
+        if num_groups <= 1 {
+            return file_groups.to_vec();
+        }
+
+        // Flatten all files
+        let all_files: Vec<_> = file_groups.iter().flat_map(|g| 
g.iter()).collect();
+        if all_files.is_empty() {
+            return file_groups.to_vec();
+        }
+
+        // Sort globally by statistics
+        let statistics = match MinMaxStatistics::new_from_files(
+            sort_order,
+            projected_schema,
+            projection_indices,
+            all_files.iter().copied(),
+        ) {
+            Ok(stats) => stats,
+            Err(_) => return file_groups.to_vec(),
+        };
+
+        let sorted_indices = statistics.min_values_sorted();
+
+        // Assign consecutive files to groups
+        let total = sorted_indices.len();
+        let base_size = total / num_groups;
+        let remainder = total % num_groups;
+
+        let mut new_groups = Vec::with_capacity(num_groups);
+        let mut offset = 0;
+        for i in 0..num_groups {
+            // First `remainder` groups get one extra file
+            let group_size = base_size + if i < remainder { 1 } else { 0 };
+            if group_size == 0 {
+                continue;
+            }
+            let group: FileGroup = sorted_indices[offset..offset + group_size]
+                .iter()
+                .map(|(idx, _)| all_files[*idx].clone())
+                .collect();

Review Comment:
   This redistribution can return fewer groups than `num_groups` when `total < 
num_groups` (or whenever `group_size == 0`), because it `continue`s instead of 
emitting empty groups. That changes `DataSourceExec` output partitioning (and 
can reduce parallelism or violate downstream expectations about stable 
partition counts). Consider always returning exactly `num_groups` groups 
(pushing empty `FileGroup`s when `group_size == 0`) to preserve partitioning, 
or explicitly document/guard that changing the partition count is acceptable 
here.
   ```suggestion
               let group: FileGroup = if group_size == 0 {
                   Vec::new()
               } else {
                   sorted_indices[offset..offset + group_size]
                       .iter()
                       .map(|(idx, _)| all_files[*idx].clone())
                       .collect()
               };
   ```



##########
datafusion/sqllogictest/test_files/sort_pushdown.slt:
##########
@@ -1644,3 +1645,460 @@ RESET datafusion.execution.parquet.enable_page_index;
 
 statement ok
 RESET datafusion.optimizer.enable_sort_pushdown;
+
+
+###############################################################
+# Statistics-based file sorting and sort elimination tests
+###############################################################
+
+statement ok
+SET datafusion.optimizer.enable_sort_pushdown = true;
+
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+statement ok
+SET datafusion.execution.collect_statistics = true;
+
+# Test A: Non-overlapping files with matching within-file ordering → Sort 
eliminated
+
+statement ok
+CREATE TABLE ta_src_a(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 
300);
+
+statement ok
+CREATE TABLE ta_src_b(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 
600);
+
+statement ok
+CREATE TABLE ta_src_c(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 
900), (10, 1000);
+
+query I
+COPY (SELECT * FROM ta_src_a ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/ta_nonoverlap/file_a.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM ta_src_b ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/ta_nonoverlap/file_b.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM ta_src_c ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/ta_nonoverlap/file_c.parquet';
+----
+4
+
+statement ok
+CREATE EXTERNAL TABLE ta_sorted(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/ta_nonoverlap/'
+WITH ORDER (id ASC);
+
+# Test A.1: SortExec eliminated — Exact pushdown
+query TT
+EXPLAIN SELECT * FROM ta_sorted ORDER BY id ASC;
+----
+logical_plan
+01)Sort: ta_sorted.id ASC NULLS LAST
+02)--TableScan: ta_sorted projection=[id, value]
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ta_nonoverlap/file_a.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ta_nonoverlap/file_b.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ta_nonoverlap/file_c.parquet]]},
 projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], 
file_type=parquet
+
+query II
+SELECT * FROM ta_sorted ORDER BY id ASC;
+----
+1 100
+2 200
+3 300
+4 400
+5 500
+6 600
+7 700
+8 800
+9 900
+10 1000
+
+# Cleanup Test A
+statement ok
+DROP TABLE ta_src_a;
+
+statement ok
+DROP TABLE ta_src_b;
+
+statement ok
+DROP TABLE ta_src_c;
+
+statement ok
+DROP TABLE ta_sorted;
+
+
+# Test B: Overlapping files → statistics-based reorder, SortExec retained
+
+statement ok
+CREATE TABLE tb_src_x(id INT, value INT) AS VALUES (1, 10), (2, 20), (3, 30), 
(4, 40), (5, 50);
+
+statement ok
+CREATE TABLE tb_src_y(id INT, value INT) AS VALUES (3, 31), (4, 41), (5, 51), 
(6, 61), (7, 71), (8, 81);
+
+statement ok
+CREATE TABLE tb_src_z(id INT, value INT) AS VALUES (6, 62), (7, 72), (8, 82), 
(9, 92), (10, 102);
+
+query I
+COPY (SELECT * FROM tb_src_x ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet';
+----
+5
+
+query I
+COPY (SELECT * FROM tb_src_y ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet';
+----
+6
+
+query I
+COPY (SELECT * FROM tb_src_z ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet';
+----
+5
+
+statement ok
+CREATE EXTERNAL TABLE tb_overlap(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/tb_overlap/'
+WITH ORDER (id ASC);
+
+# Test B.1: Multi-column DESC sort — statistics fallback sorts files [z, y, x]
+query TT
+EXPLAIN SELECT * FROM tb_overlap ORDER BY id DESC, value DESC LIMIT 5;
+----
+logical_plan
+01)Sort: tb_overlap.id DESC NULLS FIRST, tb_overlap.value DESC NULLS FIRST, 
fetch=5
+02)--TableScan: tb_overlap projection=[id, value]
+physical_plan
+01)SortExec: TopK(fetch=5), expr=[id@0 DESC, value@1 DESC], 
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet]]},
 projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+query II
+SELECT * FROM tb_overlap ORDER BY id DESC, value DESC LIMIT 5;
+----
+10 102
+9 92
+8 82
+8 81
+7 72
+
+# Cleanup Test B
+statement ok
+DROP TABLE tb_src_x;
+
+statement ok
+DROP TABLE tb_src_y;
+
+statement ok
+DROP TABLE tb_src_z;
+
+statement ok
+DROP TABLE tb_overlap;
+
+
+# Test C: Non-overlapping files with LIMIT — sort elimination + limit pushdown
+
+statement ok
+CREATE TABLE tc_src_a(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 
300);
+
+statement ok
+CREATE TABLE tc_src_b(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 
600);
+
+statement ok
+CREATE TABLE tc_src_c(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 
900), (10, 1000);
+
+query I
+COPY (SELECT * FROM tc_src_a ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tc_limit/file_a.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM tc_src_b ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tc_limit/file_b.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM tc_src_c ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tc_limit/file_c.parquet';
+----
+4
+
+statement ok
+CREATE EXTERNAL TABLE tc_limit(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/tc_limit/'
+WITH ORDER (id ASC);
+
+# Test C.1: ASC LIMIT — sort eliminated, limit pushed down
+query TT
+EXPLAIN SELECT * FROM tc_limit ORDER BY id ASC LIMIT 3;
+----
+logical_plan
+01)Sort: tc_limit.id ASC NULLS LAST, fetch=3
+02)--TableScan: tc_limit projection=[id, value]
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tc_limit/file_a.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tc_limit/file_b.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tc_limit/file_c.parquet]]},
 projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], 
file_type=parquet
+
+query II
+SELECT * FROM tc_limit ORDER BY id ASC LIMIT 3;
+----
+1 100
+2 200
+3 300
+
+# Test C.2: DESC LIMIT — reverse scan path, SortExec stays
+query TT
+EXPLAIN SELECT * FROM tc_limit ORDER BY id DESC LIMIT 3;
+----
+logical_plan
+01)Sort: tc_limit.id DESC NULLS FIRST, fetch=3
+02)--TableScan: tc_limit projection=[id, value]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tc_limit/file_c.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tc_limit/file_b.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tc_limit/file_a.parquet]]},
 projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ], 
reverse_row_groups=true
+
+query II
+SELECT * FROM tc_limit ORDER BY id DESC LIMIT 3;
+----
+10 1000
+9 900
+8 800
+
+# Cleanup Test C
+statement ok
+DROP TABLE tc_src_a;
+
+statement ok
+DROP TABLE tc_src_b;
+
+statement ok
+DROP TABLE tc_src_c;
+
+statement ok
+DROP TABLE tc_limit;
+
+
+# Test D: Multi-group case with target_partitions=2
+
+statement ok
+CREATE TABLE td_src_a(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 
300);
+
+statement ok
+CREATE TABLE td_src_b(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 
600);
+
+statement ok
+CREATE TABLE td_src_c(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 
900), (10, 1000);
+
+query I
+COPY (SELECT * FROM td_src_a ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/td_multi/file_a.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM td_src_b ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/td_multi/file_b.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM td_src_c ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/td_multi/file_c.parquet';
+----
+4
+
+statement ok
+SET datafusion.execution.target_partitions = 2;
+
+statement ok
+CREATE EXTERNAL TABLE td_multi(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/td_multi/'
+WITH ORDER (id ASC);
+
+# Test D.1: With 2 partitions, files split across 2 groups.
+# Each group's sort is eliminated; SortPreservingMergeExec merges groups.
+query TT
+EXPLAIN SELECT * FROM td_multi ORDER BY id ASC;
+----
+logical_plan
+01)Sort: td_multi.id ASC NULLS LAST
+02)--TableScan: td_multi projection=[id, value]
+physical_plan
+01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/td_multi/file_a.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/td_multi/file_b.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/td_multi/file_c.parquet]]},
 projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], 
file_type=parquet
+
+query II
+SELECT * FROM td_multi ORDER BY id ASC;
+----
+1 100
+2 200
+3 300
+4 400
+5 500
+6 600
+7 700
+8 800
+9 900
+10 1000
+
+# Cleanup Test D
+statement ok
+DROP TABLE td_src_a;
+
+statement ok
+DROP TABLE td_src_b;
+
+statement ok
+DROP TABLE td_src_c;
+
+statement ok
+DROP TABLE td_multi;
+
+# Restore target_partitions=1 for remaining tests
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+# ===========================================================
+# Test E: Inferred ordering from Parquet metadata (no WITH ORDER)
+# Parquet files written with ORDER BY have sorting_columns in metadata.
+# DataFusion should automatically infer the ordering and eliminate Sort.
+# ===========================================================
+
+# Create sorted parquet files — COPY with ORDER BY writes sorting_columns 
metadata
+statement ok
+CREATE TABLE te_src_a(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 
300);
+
+statement ok
+CREATE TABLE te_src_b(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 
600);
+
+statement ok
+CREATE TABLE te_src_c(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 
900), (10, 1000);
+
+query I
+COPY (SELECT * FROM te_src_a ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/te_inferred/file_a.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM te_src_b ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/te_inferred/file_b.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM te_src_c ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/te_inferred/file_c.parquet';
+----
+4
+
+# Create external table WITHOUT "WITH ORDER" — ordering should be inferred
+# from Parquet sorting_columns metadata
+statement ok
+CREATE EXTERNAL TABLE te_inferred(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/te_inferred/';
+
+# Test E.1: Ordering should be inferred and SortExec eliminated
+# Even without WITH ORDER, the optimizer detects that files are sorted
+# (from Parquet sorting_columns metadata) and non-overlapping.
+# SortExec is completely eliminated — just DataSourceExec with output_ordering.
+query TT
+EXPLAIN SELECT * FROM te_inferred ORDER BY id ASC;
+----
+logical_plan
+01)Sort: te_inferred.id ASC NULLS LAST
+02)--TableScan: te_inferred projection=[id, value]
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/te_inferred/file_a.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/te_inferred/file_b.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/te_inferred/file_c.parquet]]},
 projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], 
file_type=parquet
+
+# Test E.2: Verify result correctness
+query II
+SELECT * FROM te_inferred ORDER BY id ASC;
+----
+1 100
+2 200
+3 300
+4 400
+5 500
+6 600
+7 700
+8 800
+9 900
+10 1000
+
+# Test E.3: LIMIT should also work with inferred ordering
+query II
+SELECT * FROM te_inferred ORDER BY id ASC LIMIT 3;
+----
+1 100
+2 200
+3 300
+
+# Test E.4: Inferred ordering with multiple partitions
+# With target_partitions=2, files split into 2 groups.
+# Each group has SortExec eliminated, SPM merges the sorted streams.
+statement ok
+DROP TABLE te_inferred;
+
+statement ok
+SET datafusion.execution.target_partitions = 2;
+
+statement ok
+CREATE EXTERNAL TABLE te_inferred_multi(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/te_inferred/';
+
+query TT
+EXPLAIN SELECT * FROM te_inferred_multi ORDER BY id ASC;
+----
+logical_plan
+01)Sort: te_inferred_multi.id ASC NULLS LAST
+02)--TableScan: te_inferred_multi projection=[id, value]
+physical_plan
+01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/te_inferred/file_a.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/te_inferred/file_b.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/te_inferred/file_c.parquet]]},
 projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], 
file_type=parquet
+
+# Verify correctness with multi partition
+query II
+SELECT * FROM te_inferred_multi ORDER BY id ASC;
+----
+1 100
+2 200
+3 300
+4 400
+5 500
+6 600
+7 700
+8 800
+9 900
+10 1000
+
+# Cleanup Test E
+statement ok
+DROP TABLE te_src_a;
+
+statement ok
+DROP TABLE te_src_b;
+
+statement ok
+DROP TABLE te_src_c;
+
+statement ok
+DROP TABLE te_inferred_multi;
+
+# Reset settings (SLT runner uses target_partitions=4, not system default)
+statement ok
+SET datafusion.execution.target_partitions = 4;
+
+statement ok
+SET datafusion.execution.collect_statistics = true;
+
+statement ok
+SET datafusion.optimizer.enable_sort_pushdown = true;

Review Comment:
   The new test section changes session settings and ends by setting them to 
specific values rather than restoring prior/default state. If the SLT runner 
keeps state across files (or if additional tests are appended later), this can 
create hard-to-debug coupling. Consider using `RESET ...;` for settings you 
changed in this section (or otherwise restoring the values captured at the 
start of the file/section) so the test file is self-contained.
   ```suggestion
   RESET datafusion.optimizer.enable_sort_pushdown;
   ```



##########
benchmarks/src/sort_pushdown.rs:
##########
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmark for sort pushdown optimization.
+//!
+//! Tests performance of sort elimination when files are non-overlapping and
+//! internally sorted (declared via `--sorted` / `WITH ORDER`).
+//!
+//! # Usage
+//!
+//! ```text
+//! # Prepare sorted TPCH lineitem data (SF=1)
+//! ./bench.sh data sort_pushdown
+//!
+//! # Baseline (no WITH ORDER, full SortExec)
+//! ./bench.sh run sort_pushdown
+//!
+//! # With sort elimination (WITH ORDER, SortExec removed)
+//! ./bench.sh run sort_pushdown_sorted
+//! ```
+//!
+//! # Reference Results
+//!
+//! Measured on 300k rows, 8 non-overlapping sorted parquet files, single 
partition,
+//! debug build (results vary by hardware; relative speedup is the key metric):
+//!
+//! ```text
+//! Query | Description          | baseline (ms) | sort eliminated (ms) | 
speedup
+//! 
------|----------------------|---------------|---------------------|--------
+//! Q1    | ASC full scan        |           159 |                  91 |  43%
+//! Q2    | ASC LIMIT 100        |            36 |                  12 |  67%
+//! Q3    | ASC full (wide, *)   |           487 |                 333 |  31%
+//! Q4    | ASC LIMIT 100 (wide) |           119 |                  30 |  74%
+//! ```
+//!
+//! Key observations:
+//! - **LIMIT queries benefit most** (67-74%): sort elimination + limit 
pushdown
+//!   means only the first few rows are read before stopping.
+//! - **Full scans** (31-43%): saving comes from eliminating the O(n log n) 
sort
+//!   step entirely.
+//! - **Wide projections** amplify the benefit: larger rows make sorting more
+//!   expensive, so eliminating it saves more.
+
+use clap::Args;
+use futures::StreamExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use datafusion::datasource::TableProvider;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::error::Result;
+use datafusion::execution::SessionStateBuilder;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{displayable, execute_stream};
+use datafusion::prelude::*;
+use datafusion_common::DEFAULT_PARQUET_EXTENSION;
+use datafusion_common::instant::Instant;
+
+use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};
+
+#[derive(Debug, Args)]
+pub struct RunOpt {
+    /// Common options
+    #[command(flatten)]
+    common: CommonOpt,
+
+    /// Sort pushdown query number. If not specified, runs all queries
+    #[arg(short, long)]
+    pub query: Option<usize>,
+
+    /// Path to data files (lineitem). Only parquet format is supported.
+    /// Data should be pre-sorted by l_orderkey ASC for meaningful results.
+    #[arg(required = true, short = 'p', long = "path")]
+    path: PathBuf,
+
+    /// Path to JSON benchmark result to be compared using `compare.py`
+    #[arg(short = 'o', long = "output")]
+    output_path: Option<PathBuf>,
+
+    /// Mark the first column (l_orderkey) as sorted via WITH ORDER.
+    /// When set, enables sort elimination for matching queries.
+    #[arg(short = 't', long = "sorted")]
+    sorted: bool,
+}
+
+pub const SORT_PUSHDOWN_QUERY_START_ID: usize = 1;
+pub const SORT_PUSHDOWN_QUERY_END_ID: usize = 4;
+
+impl RunOpt {
+    const TABLES: [&'static str; 1] = ["lineitem"];
+
+    /// Queries benchmarking sort elimination when files are non-overlapping
+    /// and internally sorted (WITH ORDER declared via `--sorted`).
+    ///
+    /// With `--sorted`: ParquetSource returns Exact, files are verified
+    /// non-overlapping by statistics → SortExec eliminated, no SPM needed
+    /// for single partition.
+    ///
+    /// Without `--sorted`: baseline with full SortExec.
+    const QUERIES: [&'static str; 4] = [
+        // Q1: Sort elimination — full scan
+        // With --sorted: SortExec removed, sequential scan in file order
+        // Without --sorted: full SortExec required
+        r#"
+        SELECT l_orderkey, l_partkey, l_suppkey
+        FROM lineitem
+        ORDER BY l_orderkey
+        "#,
+        // Q2: Sort elimination + limit pushdown
+        // With --sorted: SortExec removed + limit pushed to DataSourceExec
+        //   → reads only first ~100 rows then stops
+        // Without --sorted: TopK sort over all data
+        r#"
+        SELECT l_orderkey, l_partkey, l_suppkey
+        FROM lineitem
+        ORDER BY l_orderkey
+        LIMIT 100
+        "#,
+        // Q3: Sort elimination — wide projection (all columns)
+        // Tests sort elimination benefit with larger row payload
+        r#"
+        SELECT *
+        FROM lineitem
+        ORDER BY l_orderkey
+        "#,
+        // Q4: Sort elimination + limit — wide projection
+        r#"
+        SELECT *
+        FROM lineitem
+        ORDER BY l_orderkey
+        LIMIT 100
+        "#,
+    ];
+
+    pub async fn run(&self) -> Result<()> {
+        let mut benchmark_run = BenchmarkRun::new();
+
+        let query_range = match self.query {
+            Some(query_id) => query_id..=query_id,
+            None => SORT_PUSHDOWN_QUERY_START_ID..=SORT_PUSHDOWN_QUERY_END_ID,
+        };
+
+        for query_id in query_range {
+            benchmark_run.start_new_case(&format!("{query_id}"));
+
+            let query_results = self.benchmark_query(query_id).await;
+            match query_results {
+                Ok(query_results) => {
+                    for iter in query_results {
+                        benchmark_run.write_iter(iter.elapsed, iter.row_count);
+                    }
+                }
+                Err(e) => {
+                    benchmark_run.mark_failed();
+                    eprintln!("Query {query_id} failed: {e}");
+                }
+            }
+        }
+
+        benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+        benchmark_run.maybe_print_failures();
+        Ok(())
+    }
+
+    async fn benchmark_query(&self, query_id: usize) -> 
Result<Vec<QueryResult>> {
+        let config = self.common.config()?;
+        let rt = self.common.build_runtime()?;
+        let state = SessionStateBuilder::new()
+            .with_config(config)
+            .with_runtime_env(rt)
+            .with_default_features()
+            .build();
+        let ctx = SessionContext::from(state);
+
+        self.register_tables(&ctx).await?;
+
+        let mut millis = vec![];
+        let mut query_results = vec![];
+        for i in 0..self.iterations() {
+            let start = Instant::now();
+
+            let query_idx = query_id - 1;
+            let sql = Self::QUERIES[query_idx].to_string();
+            let row_count = self.execute_query(&ctx, sql.as_str()).await?;
+
+            let elapsed = start.elapsed();
+            let ms = elapsed.as_secs_f64() * 1000.0;
+            millis.push(ms);
+
+            println!(
+                "Query {query_id} iteration {i} took {ms:.1} ms and returned 
{row_count} rows"
+            );
+            query_results.push(QueryResult { elapsed, row_count });
+        }
+
+        let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+        println!("Query {query_id} avg time: {avg:.2} ms");
+
+        print_memory_stats();
+
+        Ok(query_results)
+    }
+
+    async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
+        for table in Self::TABLES {
+            let table_provider = self.get_table(ctx, table).await?;
+            ctx.register_table(table, table_provider)?;
+        }
+        Ok(())
+    }
+
+    async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> 
Result<usize> {
+        let debug = self.common.debug;
+        let plan = ctx.sql(sql).await?;
+        let (state, plan) = plan.into_parts();
+
+        if debug {
+            println!("=== Logical plan ===\n{plan}\n");
+        }
+
+        let plan = state.optimize(&plan)?;
+        if debug {
+            println!("=== Optimized logical plan ===\n{plan}\n");
+        }
+        let physical_plan = state.create_physical_plan(&plan).await?;
+        if debug {
+            println!(
+                "=== Physical plan ===\n{}\n",
+                displayable(physical_plan.as_ref()).indent(true)
+            );
+        }
+
+        let mut row_count = 0;
+        let mut stream = execute_stream(physical_plan.clone(), 
state.task_ctx())?;
+        while let Some(batch) = stream.next().await {
+            row_count += batch?.num_rows();
+        }
+
+        if debug {
+            println!(
+                "=== Physical plan with metrics ===\n{}\n",
+                DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
+                    .indent(true)
+            );
+        }
+
+        Ok(row_count)
+    }
+
+    async fn get_table(
+        &self,
+        ctx: &SessionContext,
+        table: &str,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let path = self.path.to_str().unwrap();
+        let state = ctx.state();
+        let path = format!("{path}/{table}");
+        let format = Arc::new(
+            ParquetFormat::default()
+                .with_options(ctx.state().table_options().parquet.clone()),
+        );
+        let extension = DEFAULT_PARQUET_EXTENSION;
+
+        let options = ListingOptions::new(format)
+            .with_file_extension(extension)
+            .with_collect_stat(true); // Always collect statistics for sort 
pushdown
+
+        let table_path = ListingTableUrl::parse(path)?;
+        let schema = options.infer_schema(&state, &table_path).await?;
+        let options = if self.sorted {
+            // Declare the first column (l_orderkey) as sorted
+            let key_column_name = schema.fields()[0].name();
+            options
+                
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
+        } else {
+            options
+        };

Review Comment:
   The benchmark assumes the first field in the inferred schema is the intended 
sort key (`l_orderkey`). This is brittle if schema field ordering changes (or 
if the underlying dataset differs). Prefer selecting by an explicit column name 
(e.g., `l_orderkey`) and returning a clear error if missing, so the benchmark 
fails fast with actionable diagnostics instead of silently sorting on the wrong 
key.



##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -35,19 +35,21 @@
 //!    - `Inexact`: Keep Sort but use optimized input (enables early 
termination for TopK)
 //!    - `Unsupported`: No change
 //!
-//! ## Current capabilities (Phase 1)
+//! ## Capabilities
 //!
-//! - Reverse scan optimization: when required sort is the reverse of the data 
source's
+//! - **Sort elimination**: when data source's natural ordering already 
satisfies the
+//!   request (e.g., Parquet files with matching `WITH ORDER`), return `Exact` 
and
+//!   remove the `SortExec` entirely
+//! - **Reverse scan optimization**: when required sort is the reverse of the 
data source's
 //!   natural ordering, enable reverse scanning (reading row groups in reverse 
order)
-//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query 
needs
-//!   [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement
+//! - **Statistics-based file reordering**: sort files within each group by 
their min/max
+//!   statistics to approximate the requested order, improving TopK and limit 
performance
+//! - **Non-overlapping detection**: when files have non-overlapping ranges 
and matching
+//!   within-file ordering, the combined scan is `Exact` (sort eliminated)
+//! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs
+//!   [A DESC], the existing ordering satisfies the requirement

Review Comment:
   The updated docs describe prefix matching for the same-direction case, but 
they no longer mention the reverse-scan + prefix behavior that the code path 
(`Inexact` via reversed ordering) still appears to support. To avoid misleading 
readers, consider explicitly documenting both cases: (1) prefix satisfied by 
existing ordering and (2) prefix satisfied after reversal (where applicable), 
along with the implication that reversal remains `Inexact`.
   ```suggestion
   //! - **Prefix matching**:
   //!   - If data has ordering [A DESC, B ASC] and the query needs [A DESC], 
the existing
   //!     ordering (same direction) satisfies the requirement.
   //!   - If the required ordering is a prefix of the *reversed* natural 
ordering, a
   //!     reverse scan can be used to approximate the requirement, but this 
remains
   //!     `Inexact` and the top-level `SortExec` is retained.
   ```



##########
benchmarks/src/sort_pushdown.rs:
##########
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmark for sort pushdown optimization.
+//!
+//! Tests performance of sort elimination when files are non-overlapping and
+//! internally sorted (declared via `--sorted` / `WITH ORDER`).
+//!
+//! # Usage
+//!
+//! ```text
+//! # Prepare sorted TPCH lineitem data (SF=1)
+//! ./bench.sh data sort_pushdown
+//!
+//! # Baseline (no WITH ORDER, full SortExec)
+//! ./bench.sh run sort_pushdown
+//!
+//! # With sort elimination (WITH ORDER, SortExec removed)
+//! ./bench.sh run sort_pushdown_sorted
+//! ```
+//!
+//! # Reference Results
+//!
+//! Measured on 300k rows, 8 non-overlapping sorted parquet files, single 
partition,
+//! debug build (results vary by hardware; relative speedup is the key metric):
+//!
+//! ```text
+//! Query | Description          | baseline (ms) | sort eliminated (ms) | 
speedup
+//! 
------|----------------------|---------------|---------------------|--------
+//! Q1    | ASC full scan        |           159 |                  91 |  43%
+//! Q2    | ASC LIMIT 100        |            36 |                  12 |  67%
+//! Q3    | ASC full (wide, *)   |           487 |                 333 |  31%
+//! Q4    | ASC LIMIT 100 (wide) |           119 |                  30 |  74%
+//! ```
+//!
+//! Key observations:
+//! - **LIMIT queries benefit most** (67-74%): sort elimination + limit 
pushdown
+//!   means only the first few rows are read before stopping.
+//! - **Full scans** (31-43%): saving comes from eliminating the O(n log n) 
sort
+//!   step entirely.
+//! - **Wide projections** amplify the benefit: larger rows make sorting more
+//!   expensive, so eliminating it saves more.
+
+use clap::Args;
+use futures::StreamExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use datafusion::datasource::TableProvider;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::error::Result;
+use datafusion::execution::SessionStateBuilder;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{displayable, execute_stream};
+use datafusion::prelude::*;
+use datafusion_common::DEFAULT_PARQUET_EXTENSION;
+use datafusion_common::instant::Instant;
+
+use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};
+
+#[derive(Debug, Args)]
+pub struct RunOpt {
+    /// Common options
+    #[command(flatten)]
+    common: CommonOpt,
+
+    /// Sort pushdown query number. If not specified, runs all queries
+    #[arg(short, long)]
+    pub query: Option<usize>,
+
+    /// Path to data files (lineitem). Only parquet format is supported.
+    /// Data should be pre-sorted by l_orderkey ASC for meaningful results.
+    #[arg(required = true, short = 'p', long = "path")]
+    path: PathBuf,
+
+    /// Path to JSON benchmark result to be compared using `compare.py`
+    #[arg(short = 'o', long = "output")]
+    output_path: Option<PathBuf>,
+
+    /// Mark the first column (l_orderkey) as sorted via WITH ORDER.
+    /// When set, enables sort elimination for matching queries.
+    #[arg(short = 't', long = "sorted")]
+    sorted: bool,
+}
+
+pub const SORT_PUSHDOWN_QUERY_START_ID: usize = 1;
+pub const SORT_PUSHDOWN_QUERY_END_ID: usize = 4;
+
+impl RunOpt {
+    const TABLES: [&'static str; 1] = ["lineitem"];
+
+    /// Queries benchmarking sort elimination when files are non-overlapping
+    /// and internally sorted (WITH ORDER declared via `--sorted`).
+    ///
+    /// With `--sorted`: ParquetSource returns Exact, files are verified
+    /// non-overlapping by statistics → SortExec eliminated, no SPM needed
+    /// for single partition.
+    ///
+    /// Without `--sorted`: baseline with full SortExec.
+    const QUERIES: [&'static str; 4] = [
+        // Q1: Sort elimination — full scan
+        // With --sorted: SortExec removed, sequential scan in file order
+        // Without --sorted: full SortExec required
+        r#"
+        SELECT l_orderkey, l_partkey, l_suppkey
+        FROM lineitem
+        ORDER BY l_orderkey
+        "#,
+        // Q2: Sort elimination + limit pushdown
+        // With --sorted: SortExec removed + limit pushed to DataSourceExec
+        //   → reads only first ~100 rows then stops
+        // Without --sorted: TopK sort over all data
+        r#"
+        SELECT l_orderkey, l_partkey, l_suppkey
+        FROM lineitem
+        ORDER BY l_orderkey
+        LIMIT 100
+        "#,
+        // Q3: Sort elimination — wide projection (all columns)
+        // Tests sort elimination benefit with larger row payload
+        r#"
+        SELECT *
+        FROM lineitem
+        ORDER BY l_orderkey
+        "#,
+        // Q4: Sort elimination + limit — wide projection
+        r#"
+        SELECT *
+        FROM lineitem
+        ORDER BY l_orderkey
+        LIMIT 100
+        "#,
+    ];
+
+    pub async fn run(&self) -> Result<()> {
+        let mut benchmark_run = BenchmarkRun::new();
+
+        let query_range = match self.query {
+            Some(query_id) => query_id..=query_id,
+            None => SORT_PUSHDOWN_QUERY_START_ID..=SORT_PUSHDOWN_QUERY_END_ID,
+        };
+
+        for query_id in query_range {
+            benchmark_run.start_new_case(&format!("{query_id}"));
+
+            let query_results = self.benchmark_query(query_id).await;
+            match query_results {
+                Ok(query_results) => {
+                    for iter in query_results {
+                        benchmark_run.write_iter(iter.elapsed, iter.row_count);
+                    }
+                }
+                Err(e) => {
+                    benchmark_run.mark_failed();
+                    eprintln!("Query {query_id} failed: {e}");
+                }
+            }
+        }
+
+        benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+        benchmark_run.maybe_print_failures();
+        Ok(())
+    }
+
+    async fn benchmark_query(&self, query_id: usize) -> 
Result<Vec<QueryResult>> {
+        let config = self.common.config()?;
+        let rt = self.common.build_runtime()?;
+        let state = SessionStateBuilder::new()
+            .with_config(config)
+            .with_runtime_env(rt)
+            .with_default_features()
+            .build();
+        let ctx = SessionContext::from(state);
+
+        self.register_tables(&ctx).await?;
+
+        let mut millis = vec![];
+        let mut query_results = vec![];
+        for i in 0..self.iterations() {
+            let start = Instant::now();
+
+            let query_idx = query_id - 1;
+            let sql = Self::QUERIES[query_idx].to_string();
+            let row_count = self.execute_query(&ctx, sql.as_str()).await?;
+
+            let elapsed = start.elapsed();
+            let ms = elapsed.as_secs_f64() * 1000.0;
+            millis.push(ms);
+
+            println!(
+                "Query {query_id} iteration {i} took {ms:.1} ms and returned 
{row_count} rows"
+            );
+            query_results.push(QueryResult { elapsed, row_count });
+        }
+
+        let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+        println!("Query {query_id} avg time: {avg:.2} ms");
+
+        print_memory_stats();
+
+        Ok(query_results)
+    }
+
+    async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
+        for table in Self::TABLES {
+            let table_provider = self.get_table(ctx, table).await?;
+            ctx.register_table(table, table_provider)?;
+        }
+        Ok(())
+    }
+
+    async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> 
Result<usize> {
+        let debug = self.common.debug;
+        let plan = ctx.sql(sql).await?;
+        let (state, plan) = plan.into_parts();
+
+        if debug {
+            println!("=== Logical plan ===\n{plan}\n");
+        }
+
+        let plan = state.optimize(&plan)?;
+        if debug {
+            println!("=== Optimized logical plan ===\n{plan}\n");
+        }
+        let physical_plan = state.create_physical_plan(&plan).await?;
+        if debug {
+            println!(
+                "=== Physical plan ===\n{}\n",
+                displayable(physical_plan.as_ref()).indent(true)
+            );
+        }
+
+        let mut row_count = 0;
+        let mut stream = execute_stream(physical_plan.clone(), 
state.task_ctx())?;
+        while let Some(batch) = stream.next().await {
+            row_count += batch?.num_rows();
+        }
+
+        if debug {
+            println!(
+                "=== Physical plan with metrics ===\n{}\n",
+                DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
+                    .indent(true)
+            );
+        }
+
+        Ok(row_count)
+    }
+
+    async fn get_table(
+        &self,
+        ctx: &SessionContext,
+        table: &str,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let path = self.path.to_str().unwrap();

Review Comment:
   Using `to_str().unwrap()` can panic on non-UTF-8 paths. For a benchmark 
harness it’s easy to avoid the panic by using `to_string_lossy()` (best-effort) 
or returning an error when the path isn’t valid UTF-8.
   ```suggestion
           let path = self.path.to_string_lossy();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to