This is an automated email from the ASF dual-hosted git repository. blaginin pushed a commit to branch annarose/dict-coercion in repository https://gitbox.apache.org/repos/asf/datafusion-sandbox.git
commit 030621c6b6b6be6eeef0373dd60c21b69f1bbfe6 Author: Kumar Ujjawal <[email protected]> AuthorDate: Tue Feb 3 05:05:15 2026 +0530 refactor: Rename `FileSource::try_reverse_output` to `FileSource::try_pushdown_sort` (#20043) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19723. ## Rationale for this change - The API is not “reverse-only”: it takes an arbitrary requested ordering and can accept/deny/partially match it, so try_pushdown_sort is a more accurate name. - FileScanConfig::rebuild_with_source shouldn’t assume reversal; reversal is only valid when it helps satisfy the caller’s requested order. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Renames FileSource::try_reverse_output to FileSource::try_pushdown_sort (keeps try_reverse_output as a deprecated shim for compatibility). - Updates Parquet’s implementation and internal call sites to use the new name. - Updates FileScanConfig::rebuild_with_source to take the caller’s requested ordering and only reverse file_groups when the request is actually a reverse of the current output_ordering (adds a unit test). <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? - Yes: FileSource implementers should prefer try_pushdown_sort; try_reverse_output remains but is deprecated. - Slight behavioral change: file group reversal during sort pushdown is now conditional instead of always reversing <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --- datafusion/datasource-parquet/src/opener.rs | 2 +- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/file.rs | 28 ++++- datafusion/datasource/src/file_scan_config.rs | 161 +++++++++++++++++++++++--- 4 files changed, 174 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6f92d567c..f87a30265 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -581,7 +581,7 @@ impl FileOpener for ParquetOpener { // ---------------------------------------------------------- // Step: potentially reverse the access plan for performance. - // See `ParquetSource::try_reverse_output` for the rationale. + // See `ParquetSource::try_pushdown_sort` for the rationale. // ---------------------------------------------------------- if reverse_row_groups { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 07f58db18..75d87a4cd 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -757,7 +757,7 @@ impl FileSource for ParquetSource { /// # Returns /// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order /// - `Unsupported`: Cannot optimize for this ordering - fn try_reverse_output( + fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], eq_properties: &EquivalenceProperties, diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index f5380c27e..c6282c3c7 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -189,7 +189,29 @@ pub trait FileSource: Send + Sync { /// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted /// * `Unsupported` - Cannot optimize for this ordering /// - /// Default implementation returns `Unsupported`. + /// # Deprecation / migration notes + /// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `53.0.0`. + /// Per DataFusion's deprecation guidelines, it will be removed in `59.0.0` or later + /// (6 major versions or 6 months, whichever is longer). + /// - New implementations should override [`Self::try_pushdown_sort`] directly. + /// - For backwards compatibility, the default implementation of + /// [`Self::try_pushdown_sort`] delegates to the deprecated + /// [`Self::try_reverse_output`] until it is removed. After that point, the + /// default implementation will return [`SortOrderPushdownResult::Unsupported`]. + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + eq_properties: &EquivalenceProperties, + ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> { + #[expect(deprecated)] + self.try_reverse_output(order, eq_properties) + } + + /// Deprecated: Renamed to [`Self::try_pushdown_sort`]. + #[deprecated( + since = "53.0.0", + note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 59.0.0 or later." + )] fn try_reverse_output( &self, _order: &[PhysicalSortExpr], @@ -232,7 +254,7 @@ pub trait FileSource: Send + Sync { /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead. /// See `upgrading.md` for more details. #[deprecated( - since = "52.0.0", + since = "53.0.0", note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." )] #[expect(deprecated)] @@ -250,7 +272,7 @@ pub trait FileSource: Send + Sync { /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead. /// See `upgrading.md` for more details. #[deprecated( - since = "52.0.0", + since = "53.0.0", note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details." )] #[expect(deprecated)] diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 51b9ba9e0..fe78c0e52 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -851,20 +851,20 @@ impl DataSource for FileScanConfig { &self, order: &[PhysicalSortExpr], ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> { - // Delegate to FileSource to check if reverse scanning can satisfy the request. + // Delegate to FileSource to see if it can optimize for the requested ordering. let pushdown_result = self .file_source - .try_reverse_output(order, &self.eq_properties())?; + .try_pushdown_sort(order, &self.eq_properties())?; match pushdown_result { SortOrderPushdownResult::Exact { inner } => { Ok(SortOrderPushdownResult::Exact { - inner: self.rebuild_with_source(inner, true)?, + inner: self.rebuild_with_source(inner, true, order)?, }) } SortOrderPushdownResult::Inexact { inner } => { Ok(SortOrderPushdownResult::Inexact { - inner: self.rebuild_with_source(inner, false)?, + inner: self.rebuild_with_source(inner, false, order)?, }) } SortOrderPushdownResult::Unsupported => { @@ -1157,19 +1157,44 @@ impl FileScanConfig { &self, new_file_source: Arc<dyn FileSource>, is_exact: bool, + order: &[PhysicalSortExpr], ) -> Result<Arc<dyn DataSource>> { let mut new_config = self.clone(); - // Reverse file groups (FileScanConfig's responsibility) - new_config.file_groups = new_config - .file_groups - .into_iter() - .map(|group| { - let mut files = group.into_inner(); - files.reverse(); - files.into() - }) - .collect(); + // Reverse file order (within each group) if the caller is requesting a reversal of this + // scan's declared output ordering. + // + // Historically this function always reversed `file_groups` because it was only reached + // via `FileSource::try_reverse_output` (where a reversal was the only supported + // optimization). + // + // Now that `FileSource::try_pushdown_sort` is generic, we must not assume reversal: other + // optimizations may become possible (e.g. already-sorted data, statistics-based file + // reordering). Therefore we only reverse files when it is known to help satisfy the + // requested ordering. + let reverse_file_groups = if self.output_ordering.is_empty() { + false + } else if let Some(requested) = LexOrdering::new(order.iter().cloned()) { + let projected_schema = self.projected_schema()?; + let orderings = project_orderings(&self.output_ordering, &projected_schema); + orderings + .iter() + .any(|ordering| ordering.is_reverse(&requested)) + } else { + false + }; + + if reverse_file_groups { + new_config.file_groups = new_config + .file_groups + .into_iter() + .map(|group| { + let mut files = group.into_inner(); + files.reverse(); + files.into() + }) + .collect(); + } new_config.file_source = new_file_source; @@ -1392,6 +1417,62 @@ mod tests { use datafusion_physical_expr::projection::ProjectionExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + #[derive(Clone)] + struct InexactSortPushdownSource { + metrics: ExecutionPlanMetricsSet, + table_schema: TableSchema, + } + + impl InexactSortPushdownSource { + fn new(table_schema: TableSchema) -> Self { + Self { + metrics: ExecutionPlanMetricsSet::new(), + table_schema, + } + } + } + + impl FileSource for InexactSortPushdownSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn object_store::ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Result<Arc<dyn crate::file_stream::FileOpener>> { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn file_type(&self) -> &str { + "mock" + } + + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + _eq_properties: &EquivalenceProperties, + ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> { + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(self.clone()) as Arc<dyn FileSource>, + }) + } + } + #[test] fn physical_plan_config_no_projection_tab_cols_as_field() { let file_schema = aggr_test_schema(); @@ -2337,4 +2418,56 @@ mod tests { _ => panic!("Expected Hash partitioning"), } } + + #[test] + fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse() + -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + PartitionedFile::new("file1", 1), + PartitionedFile::new("file2", 1), + ])]; + + let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(), + ]) + .build(); + + let requested_asc = vec![sort_expr_asc.clone()]; + let result = config.try_pushdown_sort(&requested_asc)?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::<FileScanConfig>() + .expect("Expected FileScanConfig"); + let pushed_files = pushed_config.file_groups[0].files(); + assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2"); + + let requested_desc = vec![sort_expr_asc.reverse()]; + let result = config.try_pushdown_sort(&requested_desc)?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::<FileScanConfig>() + .expect("Expected FileScanConfig"); + let pushed_files = pushed_config.file_groups[0].files(); + assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2"); + assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1"); + + Ok(()) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
