Copilot commented on code in PR #21580:
URL: https://github.com/apache/datafusion/pull/21580#discussion_r3071275014
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
})
}
+ /// Reorder row groups by their min statistics for the given sort order.
+ ///
+ /// This helps TopK queries find optimal values first. For ASC sort,
+ /// row groups with the smallest min values come first. For DESC sort,
+ /// row groups with the largest min values come first.
+ ///
+ /// Gracefully skips reordering when:
+ /// - There is a row_selection (too complex to remap)
+ /// - 0 or 1 row groups (nothing to reorder)
+ /// - Sort expression is not a simple column reference
+ /// - Statistics are unavailable
+ pub(crate) fn reorder_by_statistics(
+ mut self,
+ sort_order: &LexOrdering,
+ file_metadata: &ParquetMetaData,
+ arrow_schema: &Schema,
+ ) -> Result<Self> {
+ // Skip if row_selection present (too complex to remap)
+ if self.row_selection.is_some() {
+ debug!("Skipping RG reorder: row_selection present");
+ return Ok(self);
+ }
+
+ // Nothing to reorder
+ if self.row_group_indexes.len() <= 1 {
+ return Ok(self);
+ }
+
+ // Get the first sort expression
+ // LexOrdering is guaranteed non-empty, so first() returns
&PhysicalSortExpr
+ let first_sort_expr = sort_order.first();
+
+ // Extract column name from sort expression
+ let column: &Column = match
first_sort_expr.expr.as_any().downcast_ref::<Column>()
+ {
+ Some(col) => col,
+ None => {
+ debug!("Skipping RG reorder: sort expr is not a simple
column");
+ return Ok(self);
+ }
+ };
+
+ let descending = first_sort_expr.options.descending;
Review Comment:
For `DESC` ordering, reordering by **min** values is often a poor proxy for
“row group likely contains the largest values first”; typically you want to
sort by **max** when `descending == true` (and by min when ascending). This can
significantly reduce the intended TopK benefit (and can even choose a worse
first row group when ranges overlap). Consider switching to
`row_group_maxs(...)` for descending order, and update the doc comment
(currently mentions “min/max”) and the DESC unit test accordingly.
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
})
}
+ /// Reorder row groups by their min statistics for the given sort order.
+ ///
+ /// This helps TopK queries find optimal values first. For ASC sort,
+ /// row groups with the smallest min values come first. For DESC sort,
+ /// row groups with the largest min values come first.
+ ///
+ /// Gracefully skips reordering when:
+ /// - There is a row_selection (too complex to remap)
+ /// - 0 or 1 row groups (nothing to reorder)
+ /// - Sort expression is not a simple column reference
+ /// - Statistics are unavailable
+ pub(crate) fn reorder_by_statistics(
+ mut self,
+ sort_order: &LexOrdering,
+ file_metadata: &ParquetMetaData,
+ arrow_schema: &Schema,
+ ) -> Result<Self> {
+ // Skip if row_selection present (too complex to remap)
+ if self.row_selection.is_some() {
+ debug!("Skipping RG reorder: row_selection present");
+ return Ok(self);
+ }
+
+ // Nothing to reorder
+ if self.row_group_indexes.len() <= 1 {
+ return Ok(self);
+ }
+
+ // Get the first sort expression
+ // LexOrdering is guaranteed non-empty, so first() returns
&PhysicalSortExpr
+ let first_sort_expr = sort_order.first();
Review Comment:
`sort_order.first()` (if `LexOrdering` is `Vec`-like) returns
`Option<&PhysicalSortExpr>`, but the code uses it as if it were
`&PhysicalSortExpr` (`first_sort_expr.expr...`). This is likely a compile
error. A concrete fix is to obtain the first element via iteration and handle
the empty case (e.g., early-return `Ok(self)` if no sort expressions), then use
the returned `&PhysicalSortExpr`.
```suggestion
let first_sort_expr = match sort_order.iter().next() {
Some(expr) => expr,
None => {
debug!("Skipping RG reorder: empty sort order");
return Ok(self);
}
};
```
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -817,7 +821,9 @@ impl FileSource for ParquetSource {
// Return Inexact because we're only reversing row group order,
// not guaranteeing perfect row-level ordering
- let new_source = self.clone().with_reverse_row_groups(true);
+ let sort_order = LexOrdering::new(order.iter().cloned());
+ let mut new_source = self.clone().with_reverse_row_groups(true);
+ new_source.sort_order_for_reorder = sort_order;
Review Comment:
`LexOrdering::new(...)` appears to return a `Result<LexOrdering, _>` (as
used with `.unwrap()` in the new unit tests), but here it’s assigned directly
without `?`/`unwrap`, and then assigned to `sort_order_for_reorder:
Option<LexOrdering>` without wrapping in `Some(...)`. This should be changed to
construct a `LexOrdering` with error propagation and store it as
`Some(sort_order)` (or skip setting the field on error). Otherwise this won’t
compile.
```suggestion
let sort_order = LexOrdering::new(order.iter().cloned())?;
let mut new_source = self.clone().with_reverse_row_groups(true);
new_source.sort_order_for_reorder = Some(sort_order);
```
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
})
}
+ /// Reorder row groups by their min statistics for the given sort order.
+ ///
+ /// This helps TopK queries find optimal values first. For ASC sort,
+ /// row groups with the smallest min values come first. For DESC sort,
+ /// row groups with the largest min values come first.
+ ///
+ /// Gracefully skips reordering when:
+ /// - There is a row_selection (too complex to remap)
+ /// - 0 or 1 row groups (nothing to reorder)
+ /// - Sort expression is not a simple column reference
+ /// - Statistics are unavailable
+ pub(crate) fn reorder_by_statistics(
+ mut self,
+ sort_order: &LexOrdering,
+ file_metadata: &ParquetMetaData,
+ arrow_schema: &Schema,
+ ) -> Result<Self> {
+ // Skip if row_selection present (too complex to remap)
+ if self.row_selection.is_some() {
+ debug!("Skipping RG reorder: row_selection present");
+ return Ok(self);
+ }
+
+ // Nothing to reorder
+ if self.row_group_indexes.len() <= 1 {
+ return Ok(self);
+ }
+
+ // Get the first sort expression
+ // LexOrdering is guaranteed non-empty, so first() returns
&PhysicalSortExpr
+ let first_sort_expr = sort_order.first();
+
+ // Extract column name from sort expression
+ let column: &Column = match
first_sort_expr.expr.as_any().downcast_ref::<Column>()
+ {
+ Some(col) => col,
+ None => {
+ debug!("Skipping RG reorder: sort expr is not a simple
column");
+ return Ok(self);
+ }
+ };
+
+ let descending = first_sort_expr.options.descending;
+
+ // Build statistics converter for this column
+ let converter = match StatisticsConverter::try_new(
+ column.name(),
+ arrow_schema,
+ file_metadata.file_metadata().schema_descr(),
+ ) {
+ Ok(c) => c,
+ Err(e) => {
+ debug!("Skipping RG reorder: cannot create stats converter:
{e}");
+ return Ok(self);
+ }
+ };
+
+ // Get min values for the selected row groups
+ let rg_metadata: Vec<&RowGroupMetaData> = self
+ .row_group_indexes
+ .iter()
+ .map(|&idx| file_metadata.row_group(idx))
+ .collect();
+
+ let min_values = match
converter.row_group_mins(rg_metadata.iter().copied()) {
+ Ok(vals) => vals,
+ Err(e) => {
+ debug!("Skipping RG reorder: cannot get min values: {e}");
+ return Ok(self);
+ }
+ };
+
+ // Sort indices by min values
+ let sort_options = arrow::compute::SortOptions {
+ descending,
+ nulls_first: first_sort_expr.options.nulls_first,
+ };
+ let sorted_indices = match arrow::compute::sort_to_indices(
+ &min_values,
Review Comment:
For `DESC` ordering, reordering by **min** values is often a poor proxy for
“row group likely contains the largest values first”; typically you want to
sort by **max** when `descending == true` (and by min when ascending). This can
significantly reduce the intended TopK benefit (and can even choose a worse
first row group when ranges overlap). Consider switching to
`row_group_maxs(...)` for descending order, and update the doc comment
(currently mentions “min/max”) and the DESC unit test accordingly.
```suggestion
// Get values for the selected row groups: mins for ASC, maxs for
DESC
let rg_metadata: Vec<&RowGroupMetaData> = self
.row_group_indexes
.iter()
.map(|&idx| file_metadata.row_group(idx))
.collect();
let sort_values = match if descending {
converter.row_group_maxs(rg_metadata.iter().copied())
} else {
converter.row_group_mins(rg_metadata.iter().copied())
} {
Ok(vals) => vals,
Err(e) => {
debug!("Skipping RG reorder: cannot get min/max values:
{e}");
return Ok(self);
}
};
// Sort indices by the statistics that best match the requested order
let sort_options = arrow::compute::SortOptions {
descending,
nulls_first: first_sort_expr.options.nulls_first,
};
let sorted_indices = match arrow::compute::sort_to_indices(
&sort_values,
```
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
})
}
+ /// Reorder row groups by their min statistics for the given sort order.
+ ///
+ /// This helps TopK queries find optimal values first. For ASC sort,
+ /// row groups with the smallest min values come first. For DESC sort,
+ /// row groups with the largest min values come first.
+ ///
+ /// Gracefully skips reordering when:
+ /// - There is a row_selection (too complex to remap)
+ /// - 0 or 1 row groups (nothing to reorder)
+ /// - Sort expression is not a simple column reference
+ /// - Statistics are unavailable
+ pub(crate) fn reorder_by_statistics(
+ mut self,
+ sort_order: &LexOrdering,
+ file_metadata: &ParquetMetaData,
+ arrow_schema: &Schema,
+ ) -> Result<Self> {
+ // Skip if row_selection present (too complex to remap)
+ if self.row_selection.is_some() {
+ debug!("Skipping RG reorder: row_selection present");
+ return Ok(self);
+ }
+
+ // Nothing to reorder
+ if self.row_group_indexes.len() <= 1 {
+ return Ok(self);
+ }
+
+ // Get the first sort expression
+ // LexOrdering is guaranteed non-empty, so first() returns
&PhysicalSortExpr
+ let first_sort_expr = sort_order.first();
+
+ // Extract column name from sort expression
+ let column: &Column = match
first_sort_expr.expr.as_any().downcast_ref::<Column>()
+ {
+ Some(col) => col,
+ None => {
+ debug!("Skipping RG reorder: sort expr is not a simple
column");
+ return Ok(self);
+ }
+ };
+
+ let descending = first_sort_expr.options.descending;
+
+ // Build statistics converter for this column
+ let converter = match StatisticsConverter::try_new(
+ column.name(),
+ arrow_schema,
+ file_metadata.file_metadata().schema_descr(),
+ ) {
+ Ok(c) => c,
+ Err(e) => {
+ debug!("Skipping RG reorder: cannot create stats converter:
{e}");
+ return Ok(self);
+ }
+ };
+
+ // Get min values for the selected row groups
+ let rg_metadata: Vec<&RowGroupMetaData> = self
+ .row_group_indexes
+ .iter()
+ .map(|&idx| file_metadata.row_group(idx))
+ .collect();
+
+ let min_values = match
converter.row_group_mins(rg_metadata.iter().copied()) {
+ Ok(vals) => vals,
+ Err(e) => {
+ debug!("Skipping RG reorder: cannot get min values: {e}");
+ return Ok(self);
+ }
+ };
+
+ // Sort indices by min values
+ let sort_options = arrow::compute::SortOptions {
+ descending,
+ nulls_first: first_sort_expr.options.nulls_first,
+ };
+ let sorted_indices = match arrow::compute::sort_to_indices(
+ &min_values,
+ Some(sort_options),
+ None,
+ ) {
Review Comment:
If multiple row groups share the same min (or max) statistic,
`sort_to_indices` may not guarantee a deterministic/stable tie-breaker across
platforms/versions. Since row-group order can affect scan reproducibility and
performance debugging, consider adding a stable secondary key (e.g., original
row group index) when statistics are equal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]