Dandandan commented on code in PR #21580:
URL: https://github.com/apache/datafusion/pull/21580#discussion_r3071362975
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +382,106 @@ impl PreparedAccessPlan {
})
}
+ /// Reorder row groups by their min statistics for the given sort order.
+ ///
+ /// This helps TopK queries find optimal values first. For ASC sort,
+ /// row groups with the smallest min values come first. For DESC sort,
+ /// row groups with the largest min values come first.
+ ///
+ /// Gracefully skips reordering when:
+ /// - There is a row_selection (too complex to remap)
+ /// - 0 or 1 row groups (nothing to reorder)
+ /// - Sort expression is not a simple column reference
+ /// - Statistics are unavailable
+ pub(crate) fn reorder_by_statistics(
+ mut self,
+ sort_order: &LexOrdering,
+ file_metadata: &ParquetMetaData,
+ arrow_schema: &Schema,
+ ) -> Result<Self> {
+ // Skip if row_selection present (too complex to remap)
+ if self.row_selection.is_some() {
+ debug!("Skipping RG reorder: row_selection present");
+ return Ok(self);
+ }
+
+ // Nothing to reorder
+ if self.row_group_indexes.len() <= 1 {
+ return Ok(self);
+ }
+
+ // Get the first sort expression
+ // LexOrdering is guaranteed non-empty, so first() returns
&PhysicalSortExpr
+ let first_sort_expr = sort_order.first();
+
+ // Extract column name from sort expression
+ let column: &Column = match
first_sort_expr.expr.as_any().downcast_ref::<Column>()
+ {
+ Some(col) => col,
+ None => {
+ debug!("Skipping RG reorder: sort expr is not a simple
column");
+ return Ok(self);
+ }
+ };
+
+ let descending = first_sort_expr.options.descending;
+
+ // Build statistics converter for this column
+ let converter = match StatisticsConverter::try_new(
+ column.name(),
+ arrow_schema,
+ file_metadata.file_metadata().schema_descr(),
+ ) {
+ Ok(c) => c,
+ Err(e) => {
+ debug!("Skipping RG reorder: cannot create stats converter:
{e}");
+ return Ok(self);
+ }
+ };
+
+ // Get min values for the selected row groups
+ let rg_metadata: Vec<&RowGroupMetaData> = self
+ .row_group_indexes
+ .iter()
+ .map(|&idx| file_metadata.row_group(idx))
+ .collect();
+
+ let min_values = match
converter.row_group_mins(rg_metadata.iter().copied()) {
+ Ok(vals) => vals,
+ Err(e) => {
+ debug!("Skipping RG reorder: cannot get min values: {e}");
+ return Ok(self);
+ }
+ };
+
+ // Sort indices by min values
+ let sort_options = arrow::compute::SortOptions {
+ descending,
+ nulls_first: first_sort_expr.options.nulls_first,
+ };
+ let sorted_indices = match arrow::compute::sort_to_indices(
+ &min_values,
Review Comment:
This is a good point
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]