Copilot commented on code in PR #21580:
URL: https://github.com/apache/datafusion/pull/21580#discussion_r3071275014


##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
         })
     }
 
+    /// Reorder row groups by their min statistics for the given sort order.
+    ///
+    /// This helps TopK queries find optimal values first. For ASC sort,
+    /// row groups with the smallest min values come first. For DESC sort,
+    /// row groups with the largest min values come first.
+    ///
+    /// Gracefully skips reordering when:
+    /// - There is a row_selection (too complex to remap)
+    /// - 0 or 1 row groups (nothing to reorder)
+    /// - Sort expression is not a simple column reference
+    /// - Statistics are unavailable
+    pub(crate) fn reorder_by_statistics(
+        mut self,
+        sort_order: &LexOrdering,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<Self> {
+        // Skip if row_selection present (too complex to remap)
+        if self.row_selection.is_some() {
+            debug!("Skipping RG reorder: row_selection present");
+            return Ok(self);
+        }
+
+        // Nothing to reorder
+        if self.row_group_indexes.len() <= 1 {
+            return Ok(self);
+        }
+
+        // Get the first sort expression
+        // LexOrdering is guaranteed non-empty, so first() returns 
&PhysicalSortExpr
+        let first_sort_expr = sort_order.first();
+
+        // Extract column name from sort expression
+        let column: &Column = match 
first_sort_expr.expr.as_any().downcast_ref::<Column>()
+        {
+            Some(col) => col,
+            None => {
+                debug!("Skipping RG reorder: sort expr is not a simple 
column");
+                return Ok(self);
+            }
+        };
+
+        let descending = first_sort_expr.options.descending;

Review Comment:
   For `DESC` ordering, reordering by **min** values is often a poor proxy for 
“row group likely contains the largest values first”; typically you want to 
sort by **max** when `descending == true` (and by min when ascending). This can 
significantly reduce the intended TopK benefit (and can even choose a worse 
first row group when ranges overlap). Consider switching to 
`row_group_maxs(...)` for descending order, and update the doc comment 
(currently mentions “min/max”) and the DESC unit test accordingly.



##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
         })
     }
 
+    /// Reorder row groups by their min statistics for the given sort order.
+    ///
+    /// This helps TopK queries find optimal values first. For ASC sort,
+    /// row groups with the smallest min values come first. For DESC sort,
+    /// row groups with the largest min values come first.
+    ///
+    /// Gracefully skips reordering when:
+    /// - There is a row_selection (too complex to remap)
+    /// - 0 or 1 row groups (nothing to reorder)
+    /// - Sort expression is not a simple column reference
+    /// - Statistics are unavailable
+    pub(crate) fn reorder_by_statistics(
+        mut self,
+        sort_order: &LexOrdering,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<Self> {
+        // Skip if row_selection present (too complex to remap)
+        if self.row_selection.is_some() {
+            debug!("Skipping RG reorder: row_selection present");
+            return Ok(self);
+        }
+
+        // Nothing to reorder
+        if self.row_group_indexes.len() <= 1 {
+            return Ok(self);
+        }
+
+        // Get the first sort expression
+        // LexOrdering is guaranteed non-empty, so first() returns 
&PhysicalSortExpr
+        let first_sort_expr = sort_order.first();

Review Comment:
   `sort_order.first()` (if `LexOrdering` is `Vec`-like) returns 
`Option<&PhysicalSortExpr>`, but the code uses it as if it were 
`&PhysicalSortExpr` (`first_sort_expr.expr...`). This is likely a compile 
error. A concrete fix is to obtain the first element via iteration and handle 
the empty case (e.g., early-return `Ok(self)` if no sort expressions), then use 
the returned `&PhysicalSortExpr`.
   ```suggestion
           let first_sort_expr = match sort_order.iter().next() {
               Some(expr) => expr,
               None => {
                   debug!("Skipping RG reorder: empty sort order");
                   return Ok(self);
               }
           };
   ```



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -817,7 +821,9 @@ impl FileSource for ParquetSource {
 
         // Return Inexact because we're only reversing row group order,
         // not guaranteeing perfect row-level ordering
-        let new_source = self.clone().with_reverse_row_groups(true);
+        let sort_order = LexOrdering::new(order.iter().cloned());
+        let mut new_source = self.clone().with_reverse_row_groups(true);
+        new_source.sort_order_for_reorder = sort_order;

Review Comment:
   `LexOrdering::new(...)` appears to return a `Result<LexOrdering, _>` (as 
used with `.unwrap()` in the new unit tests), but here it’s assigned directly 
without `?`/`unwrap`, and then assigned to `sort_order_for_reorder: 
Option<LexOrdering>` without wrapping in `Some(...)`. This should be changed to 
construct a `LexOrdering` with error propagation and store it as 
`Some(sort_order)` (or skip setting the field on error). Otherwise this won’t 
compile.
   ```suggestion
           let sort_order = LexOrdering::new(order.iter().cloned())?;
           let mut new_source = self.clone().with_reverse_row_groups(true);
           new_source.sort_order_for_reorder = Some(sort_order);
   ```



##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
         })
     }
 
+    /// Reorder row groups by their min statistics for the given sort order.
+    ///
+    /// This helps TopK queries find optimal values first. For ASC sort,
+    /// row groups with the smallest min values come first. For DESC sort,
+    /// row groups with the largest min values come first.
+    ///
+    /// Gracefully skips reordering when:
+    /// - There is a row_selection (too complex to remap)
+    /// - 0 or 1 row groups (nothing to reorder)
+    /// - Sort expression is not a simple column reference
+    /// - Statistics are unavailable
+    pub(crate) fn reorder_by_statistics(
+        mut self,
+        sort_order: &LexOrdering,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<Self> {
+        // Skip if row_selection present (too complex to remap)
+        if self.row_selection.is_some() {
+            debug!("Skipping RG reorder: row_selection present");
+            return Ok(self);
+        }
+
+        // Nothing to reorder
+        if self.row_group_indexes.len() <= 1 {
+            return Ok(self);
+        }
+
+        // Get the first sort expression
+        // LexOrdering is guaranteed non-empty, so first() returns 
&PhysicalSortExpr
+        let first_sort_expr = sort_order.first();
+
+        // Extract column name from sort expression
+        let column: &Column = match 
first_sort_expr.expr.as_any().downcast_ref::<Column>()
+        {
+            Some(col) => col,
+            None => {
+                debug!("Skipping RG reorder: sort expr is not a simple 
column");
+                return Ok(self);
+            }
+        };
+
+        let descending = first_sort_expr.options.descending;
+
+        // Build statistics converter for this column
+        let converter = match StatisticsConverter::try_new(
+            column.name(),
+            arrow_schema,
+            file_metadata.file_metadata().schema_descr(),
+        ) {
+            Ok(c) => c,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot create stats converter: 
{e}");
+                return Ok(self);
+            }
+        };
+
+        // Get min values for the selected row groups
+        let rg_metadata: Vec<&RowGroupMetaData> = self
+            .row_group_indexes
+            .iter()
+            .map(|&idx| file_metadata.row_group(idx))
+            .collect();
+
+        let min_values = match 
converter.row_group_mins(rg_metadata.iter().copied()) {
+            Ok(vals) => vals,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot get min values: {e}");
+                return Ok(self);
+            }
+        };
+
+        // Sort indices by min values
+        let sort_options = arrow::compute::SortOptions {
+            descending,
+            nulls_first: first_sort_expr.options.nulls_first,
+        };
+        let sorted_indices = match arrow::compute::sort_to_indices(
+            &min_values,

Review Comment:
   For `DESC` ordering, reordering by **min** values is often a poor proxy for 
“row group likely contains the largest values first”; typically you want to 
sort by **max** when `descending == true` (and by min when ascending). This can 
significantly reduce the intended TopK benefit (and can even choose a worse 
first row group when ranges overlap). Consider switching to 
`row_group_maxs(...)` for descending order, and update the doc comment 
(currently mentions “min/max”) and the DESC unit test accordingly.
   ```suggestion
           // Get values for the selected row groups: mins for ASC, maxs for 
DESC
           let rg_metadata: Vec<&RowGroupMetaData> = self
               .row_group_indexes
               .iter()
               .map(|&idx| file_metadata.row_group(idx))
               .collect();
   
           let sort_values = match if descending {
               converter.row_group_maxs(rg_metadata.iter().copied())
           } else {
               converter.row_group_mins(rg_metadata.iter().copied())
           } {
               Ok(vals) => vals,
               Err(e) => {
                   debug!("Skipping RG reorder: cannot get min/max values: 
{e}");
                   return Ok(self);
               }
           };
   
           // Sort indices by the statistics that best match the requested order
           let sort_options = arrow::compute::SortOptions {
               descending,
               nulls_first: first_sort_expr.options.nulls_first,
           };
           let sorted_indices = match arrow::compute::sort_to_indices(
               &sort_values,
   ```



##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
         })
     }
 
+    /// Reorder row groups by their min statistics for the given sort order.
+    ///
+    /// This helps TopK queries find optimal values first. For ASC sort,
+    /// row groups with the smallest min values come first. For DESC sort,
+    /// row groups with the largest min values come first.
+    ///
+    /// Gracefully skips reordering when:
+    /// - There is a row_selection (too complex to remap)
+    /// - 0 or 1 row groups (nothing to reorder)
+    /// - Sort expression is not a simple column reference
+    /// - Statistics are unavailable
+    pub(crate) fn reorder_by_statistics(
+        mut self,
+        sort_order: &LexOrdering,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<Self> {
+        // Skip if row_selection present (too complex to remap)
+        if self.row_selection.is_some() {
+            debug!("Skipping RG reorder: row_selection present");
+            return Ok(self);
+        }
+
+        // Nothing to reorder
+        if self.row_group_indexes.len() <= 1 {
+            return Ok(self);
+        }
+
+        // Get the first sort expression
+        // LexOrdering is guaranteed non-empty, so first() returns 
&PhysicalSortExpr
+        let first_sort_expr = sort_order.first();
+
+        // Extract column name from sort expression
+        let column: &Column = match 
first_sort_expr.expr.as_any().downcast_ref::<Column>()
+        {
+            Some(col) => col,
+            None => {
+                debug!("Skipping RG reorder: sort expr is not a simple 
column");
+                return Ok(self);
+            }
+        };
+
+        let descending = first_sort_expr.options.descending;
+
+        // Build statistics converter for this column
+        let converter = match StatisticsConverter::try_new(
+            column.name(),
+            arrow_schema,
+            file_metadata.file_metadata().schema_descr(),
+        ) {
+            Ok(c) => c,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot create stats converter: 
{e}");
+                return Ok(self);
+            }
+        };
+
+        // Get min values for the selected row groups
+        let rg_metadata: Vec<&RowGroupMetaData> = self
+            .row_group_indexes
+            .iter()
+            .map(|&idx| file_metadata.row_group(idx))
+            .collect();
+
+        let min_values = match 
converter.row_group_mins(rg_metadata.iter().copied()) {
+            Ok(vals) => vals,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot get min values: {e}");
+                return Ok(self);
+            }
+        };
+
+        // Sort indices by min values
+        let sort_options = arrow::compute::SortOptions {
+            descending,
+            nulls_first: first_sort_expr.options.nulls_first,
+        };
+        let sorted_indices = match arrow::compute::sort_to_indices(
+            &min_values,
+            Some(sort_options),
+            None,
+        ) {

Review Comment:
   If multiple row groups share the same min (or max) statistic, 
`sort_to_indices` may not guarantee a deterministic/stable tie-breaker across 
platforms/versions. Since row-group order can affect scan reproducibility and 
performance debugging, consider adding a stable secondary key (e.g., original 
row group index) when statistics are equal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to