Copilot commented on code in PR #21956:
URL: https://github.com/apache/datafusion/pull/21956#discussion_r3166956155


##########
datafusion/datasource/src/file_stream/work_source.rs:
##########
@@ -85,8 +85,18 @@ impl SharedWorkSource {
     }
 
     /// Create a shared work source for the unopened files in `config`.
+    ///
+    /// Files are reordered by the file source (e.g. by statistics for TopK)
+    /// before being placed in the shared queue.
     pub(crate) fn from_config(config: &FileScanConfig) -> Self {
-        Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned())
+        let files: Vec<_> = config
+            .file_groups
+            .iter()
+            .flat_map(FileGroup::iter)
+            .cloned()
+            .collect();
+        let files = config.file_source.reorder_files(files);
+        Self::new(files)
     }

Review Comment:
   `SharedWorkSource::from_config` now collects files into a `Vec`, calls 
`reorder_files`, and then `SharedWorkSource::new` collects again into a 
`VecDeque`. This introduces an extra allocation/iteration for every shared scan 
(even when `reorder_files` is a no-op). Consider avoiding the double-collect 
(e.g., construct the `VecDeque` directly from the reordered `Vec` without 
re-collecting, or provide a constructor that takes ownership of a 
`Vec`/`VecDeque`).



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -952,14 +962,16 @@ impl SortExec {
         if fetch.is_some() && is_pipeline_friendly {
             cache = cache.with_boundedness(Boundedness::Bounded);
         }
-        let filter = fetch.is_some().then(|| {
-            // If we already have a filter, keep it. Otherwise, create a new 
one.
-            self.filter.clone().unwrap_or_else(|| self.create_filter())
-        });
         let mut new_sort = self.cloned();
         new_sort.fetch = fetch;
         new_sort.cache = cache.into();
-        new_sort.filter = filter;
+        new_sort.filter = fetch.is_some().then(|| {
+            // If we already have a filter, keep it. Otherwise, create a new 
one.
+            // Must be called after setting fetch so DynamicFilter gets the K 
value.
+            self.filter
+                .clone()
+                .unwrap_or_else(|| new_sort.create_filter())

Review Comment:
   `SortExec::with_fetch` reuses an existing `self.filter` even when `fetch` 
changes. Since `DynamicFilterPhysicalExpr` now stores `fetch` (K), this can 
leave the dynamic filter with a stale K if an optimizer calls `with_fetch` with 
a different limit (e.g. `LimitPushdown` combining limits). Consider recreating 
the filter when `fetch != self.fetch` (or otherwise updating the filter’s K), 
and only reusing the existing filter when the fetch value is unchanged.
   



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1129,104 @@ impl RowGroupsPrunedParquetOpen {
             );
         }
 
-        // Prepare the access plan (extract row groups and row selection)
-        let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+        // Row group ordering optimization (two composable steps):
+        //
+        // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+        //    with the file's declared output ordering. This fixes out-of-order
+        //    RGs (e.g., from append-heavy workloads) without changing 
direction.
+        //    Skipped gracefully when statistics are unavailable.
+        //
+        // 2. reverse: flip the order for DESC queries. Applied AFTER reorder
+        //    so the reversed order is correct whether or not reorder changed
+        //    anything. Also handles row_selection remapping.
+        //
+        // For sorted data: reorder is a no-op, reverse gives perfect DESC.
+        // For unsorted data: reorder fixes the order, reverse flips for DESC.
+        let reorder_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if let Some(sort_order) = &prepared.sort_order_for_reorder {
+            Some(
+                
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                    sort_order.clone(),
+                ))
+                    as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>,
+            )
+        } else if let Some(predicate) = &prepared.predicate
+            && let Some(df) = find_dynamic_filter(predicate)
+            && let Some(sort_options) = df.sort_options()
+            && !sort_options.is_empty()
+        {
+            // Build a sort order from DynamicFilter for non-sort-pushdown 
TopK.
+            // Quick bail: check if the sort column exists in file schema.
+            let children = df.children();
+            if !children.is_empty() {
+                let col = find_column_in_expr(children[0]);
+                if let Some(c) = col
+                    && prepared
+                        .physical_file_schema
+                        .field_with_name(c.name())
+                        .is_ok()
+                {
+                    let sort_expr =
+                        
datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
+                            expr: Arc::clone(children[0]),
+                            options: arrow::compute::SortOptions {
+                                descending: false,
+                                nulls_first: sort_options[0].nulls_first,
+                            },
+                        };

Review Comment:
   When building the `LexOrdering` from `DynamicFilter`, this uses `expr: 
Arc::clone(children[0])`. However, `PreparedAccessPlan::reorder_by_statistics` 
currently only supports a *direct* `Column` sort expression and will skip if 
the expression is wrapped (e.g. `Cast(Column)`). That can make 
`reorder_optimizer` appear enabled while it’s guaranteed to no-op for wrapped 
expressions, yet the later DESC logic may still apply `ReverseRowGroups`. 
Consider constructing the sort order using the unwrapped `Column` (from 
`find_column_in_expr`) or teaching `reorder_by_statistics` to unwrap 
single-child wrappers consistently.



##########
datafusion/sqllogictest/test_files/sort_pushdown.slt:
##########
@@ -2280,3 +2280,368 @@ SET datafusion.execution.collect_statistics = true;
 
 statement ok
 SET datafusion.optimizer.enable_sort_pushdown = true;
+
+# ===========================================================
+# Test H: Row group reorder by statistics for TopK queries.
+# When a file has multiple row groups with overlapping or
+# out-of-order statistics, sort pushdown returns Inexact and
+# `reorder_by_statistics` reorders row groups within the file
+# so TopK finds optimal values first.
+# ===========================================================
+
+# Create a table with 30 rows and write to parquet with small row groups
+# so we get multiple row groups per file. Rows are inserted in a mixed
+# order so row groups span overlapping ranges (forcing Inexact path).
+statement ok
+CREATE TABLE th_mixed(id INT, value INT) AS VALUES
+  (15, 150), (5, 50),  (25, 250),
+  (10, 100), (20, 200), (1, 10),
+  (30, 300), (3, 30),  (18, 180);

Review Comment:
   The comment says “Create a table with 30 rows” but the `VALUES` list only 
inserts 9 rows. Update the comment to match the actual test data to avoid 
confusion when maintaining these SLT cases.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1129,104 @@ impl RowGroupsPrunedParquetOpen {
             );
         }
 
-        // Prepare the access plan (extract row groups and row selection)
-        let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+        // Row group ordering optimization (two composable steps):
+        //
+        // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+        //    with the file's declared output ordering. This fixes out-of-order
+        //    RGs (e.g., from append-heavy workloads) without changing 
direction.
+        //    Skipped gracefully when statistics are unavailable.
+        //
+        // 2. reverse: flip the order for DESC queries. Applied AFTER reorder
+        //    so the reversed order is correct whether or not reorder changed
+        //    anything. Also handles row_selection remapping.
+        //
+        // For sorted data: reorder is a no-op, reverse gives perfect DESC.
+        // For unsorted data: reorder fixes the order, reverse flips for DESC.
+        let reorder_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if let Some(sort_order) = &prepared.sort_order_for_reorder {
+            Some(
+                
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                    sort_order.clone(),
+                ))
+                    as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>,
+            )
+        } else if let Some(predicate) = &prepared.predicate
+            && let Some(df) = find_dynamic_filter(predicate)
+            && let Some(sort_options) = df.sort_options()
+            && !sort_options.is_empty()
+        {
+            // Build a sort order from DynamicFilter for non-sort-pushdown 
TopK.
+            // Quick bail: check if the sort column exists in file schema.
+            let children = df.children();
+            if !children.is_empty() {
+                let col = find_column_in_expr(children[0]);
+                if let Some(c) = col
+                    && prepared
+                        .physical_file_schema
+                        .field_with_name(c.name())
+                        .is_ok()
+                {
+                    let sort_expr =
+                        
datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
+                            expr: Arc::clone(children[0]),
+                            options: arrow::compute::SortOptions {
+                                descending: false,
+                                nulls_first: sort_options[0].nulls_first,
+                            },
+                        };
+                    LexOrdering::new(vec![sort_expr]).map(|order| {
+                        
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                            order,
+                        ))
+                            as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>
+                    })
+                } else {
+                    None
+                }
+            } else {
+                None
+            }
+        } else {
+            None
+        };
 
-        // Potentially reverse the access plan for performance.
-        // See `ParquetSource::try_pushdown_sort` for the rationale.
-        if prepared.reverse_row_groups {
-            prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
+        // Reverse for DESC queries. Only when reorder is active (the sort
+        // column exists in parquet stats). Without reorder, reversing RGs
+        // randomly changes I/O patterns with no benefit.
+        let is_descending = prepared.reverse_row_groups
+            || (reorder_optimizer.is_some()
+                && prepared
+                    .predicate
+                    .as_ref()
+                    .and_then(find_dynamic_filter)
+                    .and_then(|df| df.sort_options().map(|opts| 
opts[0].descending))
+                    .unwrap_or(false));
+        let reverse_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if is_descending {

Review Comment:
   The comment says reverse is applied “only when reorder is active (the sort 
column exists in parquet stats)”, but `is_descending` is gated on 
`reorder_optimizer.is_some()`, which is decided before attempting the 
stats-based reorder and does not guarantee stats are available (or that reorder 
actually ran). As a result, DESC queries may still reverse row groups even when 
`reorder_by_statistics` skips due to missing/unsupported statistics, 
potentially randomizing I/O order. Consider determining whether 
statistics-based reorder actually applied (or verifying stats availability) 
before enabling the reverse optimizer for the DynamicFilter path.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to