adriangb commented on code in PR #19064:
URL: https://github.com/apache/datafusion/pull/19064#discussion_r2611965088
##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use object_store::ObjectStore;
/// Helper function to convert any type implementing FileSource to Arc<dyn
FileSource>
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn
FileSource> {
Arc::new(source)
}
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+ /// The source can guarantee exact ordering (data is perfectly sorted)
Review Comment:
```suggestion
/// The source can guarantee exact ordering (data is perfectly sorted)
/// and that ordering fully satisfies the sort order that was pushed
down.
/// The upstream sort can be dropped, all data in this partition will
flow up
/// already sorted from the scan.
```
##########
datafusion/datasource/src/source.rs:
##########
@@ -360,6 +378,19 @@ impl ExecutionPlan for DataSourceExec {
}),
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ match self.data_source.try_pushdown_sort(order)? {
+ Some(new_data_source) => {
+ let new_exec = self.clone().with_data_source(new_data_source);
+ Ok(Some(Arc::new(new_exec)))
Review Comment:
```suggestion
Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
```
We probably want to trigger rebuilding of `cache`
##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use object_store::ObjectStore;
/// Helper function to convert any type implementing FileSource to Arc<dyn
FileSource>
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn
FileSource> {
Arc::new(source)
}
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+ /// The source can guarantee exact ordering (data is perfectly sorted)
+ Exact { inner: T },
+ /// The source has optimized for the ordering but cannot guarantee perfect
sorting
+ /// (e.g., reordered files/row groups based on statistics)
+ Inexact { inner: T },
+ /// The source cannot optimize for this ordering
+ Unsupported,
+}
+
+impl<T> SortOrderPushdownResult<T> {
+ /// Returns true if the result is Exact
+ pub fn is_exact(&self) -> bool {
+ matches!(self, Self::Exact { .. })
+ }
+
+ /// Returns true if the result is Inexact
+ pub fn is_inexact(&self) -> bool {
+ matches!(self, Self::Inexact { .. })
+ }
+
+ /// Returns true if optimization was successful (Exact or Inexact)
+ pub fn is_supported(&self) -> bool {
+ !matches!(self, Self::Unsupported)
+ }
Review Comment:
Are these used anywhere? If not I suggest we keep them out of the public API
for now.
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -794,6 +822,34 @@ impl FileSource for ParquetSource {
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
+
+ /// When push down to parquet source of a sort operation is possible,
+ /// create a new ParquetSource with reverse_scan enabled.
+ ///
+ /// # Phase 1 Behavior (Current)
+ /// Returns `Inexact` because we're only reversing the scan direction and
reordering
+ /// files/row groups. We still need to verify ordering at a higher level.
+ ///
+ /// # Phase 2 (Future)
+ /// Could return `Exact` when we can guarantee that the scan order matches
the requested order, and
+ /// we can remove any higher-level sort operations.
+ ///
+ /// TODO support more policies in addition to reversing the scan.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn
FileSource>>> {
+ // Note: We ignore the specific `order` parameter here because the
decision
+ // about whether we can reverse is made at the FileScanConfig level.
+ // This method creates a reversed version of the current ParquetSource,
+ // and the FileScanConfig will reverse both the file list and the
declared ordering.
+ let new_source = self.clone().with_reverse_scan_inexact(true);
Review Comment:
I don't think we should rely on what `FileScanConfig` does.
I think we should do our own analysis here and return what we are able to do.
Then `FileScanConfig` should combine that with what it is able to do.
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -490,6 +497,15 @@ impl ParquetSource {
)),
}
}
+
+ pub fn with_reverse_scan_inexact(mut self, reverse_scan_inexact: bool) ->
Self {
+ self.reverse_scan_inexact = reverse_scan_inexact;
+ self
+ }
+
+ pub fn reverse_scan_inexact(&self) -> bool {
+ self.reverse_scan_inexact
+ }
Review Comment:
not a huge fan of tying these to the public API.
I would follow the pattern that pushdown of fitlers / projection have and
propagate the full `try_pushdown_sort` into the `FileSource` layer.
Or if these are just to be used from tests make them `#[cfg(test)]` or
`pub(crate)`.
##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization (Phase 1)
+//!
+//! Phase 1 supports reverse scan optimization: when the required sort order is
+//! the reverse of the data source's output ordering (or a prefix of it), we
perform
+//! a reverse scan at the data source level (reading row groups in reverse
order).
+//!
+//! **Prefix Matching**: If the data has ordering [A DESC, B ASC] and the
query needs
+//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement.
+//!
+//! This optimization:
+//! 1. Detects SortExec nodes that require a specific ordering
+//! 2. Recursively traverses through transparent nodes to find data sources
+//! 3. Checks if required order is reverse of output order (supports prefix
matching)
+//! 4. If yes, pushes down reverse scan to data source
+//! 5. Returns **Inexact** ordering (keeps Sort but enables early termination)
+//! 6. Phase 2 will support more complex scenarios (file reordering) and
detect perfect ordering
Review Comment:
I think we can rework this to be simpler and more flexible. I will make a PR.
##########
datafusion/datasource/src/source.rs:
##########
@@ -190,6 +190,24 @@ pub trait DataSource: Send + Sync + Debug {
vec![PushedDown::No; filters.len()],
))
}
+
+ /// Try to create a new DataSource that produces data in the specified
sort order.
+ ///
+ /// # Arguments
+ /// * `order` - The desired output ordering
+ ///
+ /// # Returns
+ /// * `Ok(Some(source))` - Created a source that satisfies the ordering
+ /// * `Ok(None)` - Cannot optimize for this ordering
+ /// * `Err(e)` - Error occurred
+ ///
+ /// Default implementation returns `Ok(None)`.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> Result<Option<Arc<dyn DataSource>>> {
+ Ok(None)
+ }
Review Comment:
Again wondering why not just keep everything consistent using the enum. I
guess I'll see once I get to teh optimizer rule itself.
##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use object_store::ObjectStore;
/// Helper function to convert any type implementing FileSource to Arc<dyn
FileSource>
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn
FileSource> {
Arc::new(source)
}
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+ /// The source can guarantee exact ordering (data is perfectly sorted)
+ Exact { inner: T },
+ /// The source has optimized for the ordering but cannot guarantee perfect
sorting
+ /// (e.g., reordered files/row groups based on statistics)
Review Comment:
```suggestion
/// The source has optimized for the ordering but cannot guarantee
perfect sorting
/// (e.g., reordered files/row groups based on statistics).
/// The upstream sort will be preserved but may still benefit from
cheaper sorting (e.g. if the data
/// is mostly sorted) or may be able to terminate early (e.g. if blocks
of data are clustered om alignment with the sort order).
/// Crucially this means that this node should be replaced with `inner`
within its parent.
```
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -434,10 +436,90 @@ impl FileOpener for ParquetOpener {
}
let row_group_indexes = access_plan.row_group_indexes();
- if let Some(row_selection) =
- access_plan.into_overall_row_selection(rg_metadata)?
- {
- builder = builder.with_row_selection(row_selection);
+
+ // Extract row selection before potentially reversing
+ let row_selection_opt =
+ access_plan.into_overall_row_selection(rg_metadata)?;
+
+ if reverse_scan_inexact {
+ // Reverse the row groups
+ let reversed_indexes: Vec<_> =
+ row_group_indexes.clone().into_iter().rev().collect();
+
+ // If we have a row selection, we need to rebuild it for the
reversed order
+ if let Some(row_selection) = row_selection_opt {
+ // Build a mapping of row group index to its row range in
the file
+ let mut rg_row_ranges: Vec<(usize, usize, usize)> =
Vec::new(); // (rg_index, start_row, end_row)
Review Comment:
Use `with_capacity()`
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -434,10 +436,90 @@ impl FileOpener for ParquetOpener {
}
let row_group_indexes = access_plan.row_group_indexes();
- if let Some(row_selection) =
- access_plan.into_overall_row_selection(rg_metadata)?
- {
- builder = builder.with_row_selection(row_selection);
+
+ // Extract row selection before potentially reversing
+ let row_selection_opt =
Review Comment:
It would also be great for it to have a lot of unit tests.
##########
datafusion/common/src/config.rs:
##########
@@ -837,6 +837,18 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2
+
+ /// Enable sort pushdown optimization for Parquet files.
+ /// When enabled, optimizes queries with ORDER BY:
+ /// - Reordering files based on statistics
+ /// - Reversing row group read order when beneficial
Review Comment:
There is another thing we can do (please not in this PR haha but just in
future) which is to re-order row groups / files based on statistics. This would
again be an imperfect ordering but it would allow us to handle more cases than
reversing, essentially arbitrary sorts. Obviously if the data is randomly
distributed this is not helpful. But if it is somewhat clustered it could be
quite beneficial.
##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -682,6 +684,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
+
+ /// Try to create a new execution plan that satisfies the given sort
ordering.
+ ///
+ /// Default implementation returns `Ok(None)`.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ Ok(None)
+ }
Review Comment:
Why wouldn't we use the `Inexact/Exact/Unsupported` structure here?
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -434,10 +436,90 @@ impl FileOpener for ParquetOpener {
}
let row_group_indexes = access_plan.row_group_indexes();
- if let Some(row_selection) =
- access_plan.into_overall_row_selection(rg_metadata)?
- {
- builder = builder.with_row_selection(row_selection);
+
+ // Extract row selection before potentially reversing
+ let row_selection_opt =
Review Comment:
Can this be factored out in `datasource-parquet/src/sort.rs` or something?
That would also be a good place to add more stuff as we grow the capabilities
of adjust scan orders.
##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use object_store::ObjectStore;
/// Helper function to convert any type implementing FileSource to Arc<dyn
FileSource>
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn
FileSource> {
Arc::new(source)
}
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+ /// The source can guarantee exact ordering (data is perfectly sorted)
+ Exact { inner: T },
+ /// The source has optimized for the ordering but cannot guarantee perfect
sorting
+ /// (e.g., reordered files/row groups based on statistics)
+ Inexact { inner: T },
+ /// The source cannot optimize for this ordering
Review Comment:
```suggestion
/// The source cannot optimize for this ordering.
/// This case means that the upstream sort should be left intact but
also that this node
/// should not be replaced in the plan.
```
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -766,6 +767,107 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<Option<Arc<dyn DataSource>>> {
+ let current_ordering = match self.output_ordering.first() {
+ Some(ordering) => ordering.as_ref(),
+ None => return Ok(None),
+ };
+
+ // Only support reverse ordering pushdown until now
+ if !is_reverse_ordering(order, current_ordering) {
+ return Ok(None);
+ }
+
+ // Ask the file source if it can handle the sort pushdown
+ let pushdown_result = self.file_source.try_pushdown_sort(order)?;
+
+ let new_file_source = match pushdown_result {
+ SortOrderPushdownResult::Exact { inner }
+ | SortOrderPushdownResult::Inexact { inner } => inner,
+ SortOrderPushdownResult::Unsupported => return Ok(None),
+ };
+
+ let mut new_config = self.clone();
+
+ // Reverse file groups: when scanning in reverse, we need to read files
+ // in reverse order to maintain the correct global ordering
+ new_config.file_groups = new_config
+ .file_groups
+ .into_iter()
+ .map(|group| {
+ let mut files = group.into_inner();
+ files.reverse();
+ files.into()
+ })
+ .collect();
+
+ // Phase 1: DO NOT change output_ordering
+ // The ordering is still the same as before (e.g., ASC) because:
+ // 1. We're only reversing row groups, not rows within groups
+ // 2. This makes the scan "closer" to DESC but not guaranteed
+ // 3. The Sort operator above will still be needed
+ //
+ // Keep the original output_ordering unchanged
+ // new_config.output_ordering = ... (NO CHANGE)
+
+ new_config.file_source = new_file_source;
+
+ Ok(Some(Arc::new(new_config)))
+ }
+}
+
+/// Check if the requested ordering can be satisfied by reversing the current
ordering.
+///
+/// This function supports **prefix matching**: if the file has ordering [A
DESC, B ASC]
+/// and we need [A ASC], reversing the scan gives us [A ASC, B DESC], which
satisfies
+/// the requirement since [A ASC] is a prefix.
+///
+/// # Arguments
+/// * `requested` - The ordering required by the query
+/// * `current` - The natural ordering of the data source (e.g., from file
metadata)
+///
+/// # Returns
+/// `true` if reversing the current ordering would satisfy the requested
ordering
+///
+/// # Example
+/// ```text
+/// Current: [number DESC, letter ASC]
+/// Requested: [number ASC]
+/// Reversed: [number ASC, letter DESC] ✓ Prefix match!
+/// ```
+fn is_reverse_ordering(
+ requested: &[PhysicalSortExpr],
+ current: &[PhysicalSortExpr],
+) -> bool {
+ // Allow prefix matching - we can satisfy a prefix of the current ordering
+ // by reversing the scan
+ if requested.len() > current.len() {
+ return false;
+ }
+
+ requested.iter().zip(current.iter()).all(|(req, cur)| {
+ // Check if the expressions are semantically equivalent using
PhysicalExpr::eq
+ // This is more robust than string comparison as it handles:
+ // - Expression equivalence (not just string representation)
+ // - Complex expressions that might have different string forms but
same semantics
+ let exprs_match = req.expr.eq(&cur.expr);
+
+ // Now check if the sort options are exactly reversed
+ // For a valid reverse scan:
+ // - descending must be opposite: ASC ↔ DESC
+ // - nulls_first must be opposite: NULLS FIRST ↔ NULLS LAST
+ let options_reversed = req.options.descending != cur.options.descending
+ && req.options.nulls_first != cur.options.nulls_first;
+
+ // Both conditions must be true:
+ // 1. Expressions are semantically equivalent
+ // 2. Completely reversed sort options
+ exprs_match && options_reversed
+ })
Review Comment:
This is very nice and exquisitely documented.
##########
datafusion/core/tests/physical_optimizer/pushdown_sort.rs:
##########
@@ -0,0 +1,778 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for sort pushdown optimizer rule (Phase 1)
+//!
+//! Phase 1 tests verify that:
+//! 1. Reverse scan is enabled (reverse_scan_inexact=true)
+//! 2. SortExec is kept (because ordering is inexact)
+//! 3. output_ordering remains unchanged
+//! 4. Early termination is enabled for TopK queries
+//! 5. Prefix matching works correctly
+
+use arrow::compute::SortOptions;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+
+use crate::physical_optimizer::test_utils::{
+ coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
+ parquet_exec_with_sort, repartition_exec, schema, sort_exec,
sort_exec_with_fetch,
+ sort_expr, sort_expr_options, OptimizationTest,
+};
+
+#[test]
+fn test_sort_pushdown_disabled() {
+ // When pushdown is disabled, plan should remain unchanged
+ let schema = schema();
+ let source = parquet_exec(schema.clone());
+ let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
+ let plan = sort_exec(sort_exprs, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), false),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_sort_pushdown_basic_phase1() {
+ // Phase 1: Reverse scan enabled, Sort kept, output_ordering unchanged
+ let schema = schema();
+
+ // Source has ASC NULLS LAST ordering (default)
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request DESC NULLS LAST ordering (exact reverse)
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(desc_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet,
reverse_scan_inexact=true
Review Comment:
Shouldn't output ordering be updated here? Or be made unknown or something?
If we're reversing the scan it's either `a@0 DESC` or `unknown` (I think it
should be the latter since we're just shuffling around row groups)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]