adriangb commented on code in PR #19064:
URL: https://github.com/apache/datafusion/pull/19064#discussion_r2611965088


##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use 
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 use object_store::ObjectStore;
 
 /// Helper function to convert any type implementing FileSource to Arc<dyn 
FileSource>
 pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn 
FileSource> {
     Arc::new(source)
 }
 
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+    /// The source can guarantee exact ordering (data is perfectly sorted)

Review Comment:
   ```suggestion
       /// The source can guarantee exact ordering (data is perfectly sorted)
       /// and that ordering fully satisfies the sort order that was pushed 
down.
       /// The upstream sort can be dropped, all data in this partition will 
flow up
       /// already sorted from the scan.
   ```



##########
datafusion/datasource/src/source.rs:
##########
@@ -360,6 +378,19 @@ impl ExecutionPlan for DataSourceExec {
             }),
         }
     }
+
+    fn try_pushdown_sort(
+        &self,
+        order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        match self.data_source.try_pushdown_sort(order)? {
+            Some(new_data_source) => {
+                let new_exec = self.clone().with_data_source(new_data_source);
+                Ok(Some(Arc::new(new_exec)))

Review Comment:
   ```suggestion
                   Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
   ```
   
   We probably want to trigger rebuilding of `cache`



##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use 
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 use object_store::ObjectStore;
 
 /// Helper function to convert any type implementing FileSource to Arc&lt;dyn 
FileSource&gt;
 pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn 
FileSource> {
     Arc::new(source)
 }
 
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+    /// The source can guarantee exact ordering (data is perfectly sorted)
+    Exact { inner: T },
+    /// The source has optimized for the ordering but cannot guarantee perfect 
sorting
+    /// (e.g., reordered files/row groups based on statistics)
+    Inexact { inner: T },
+    /// The source cannot optimize for this ordering
+    Unsupported,
+}
+
+impl<T> SortOrderPushdownResult<T> {
+    /// Returns true if the result is Exact
+    pub fn is_exact(&self) -> bool {
+        matches!(self, Self::Exact { .. })
+    }
+
+    /// Returns true if the result is Inexact
+    pub fn is_inexact(&self) -> bool {
+        matches!(self, Self::Inexact { .. })
+    }
+
+    /// Returns true if optimization was successful (Exact or Inexact)
+    pub fn is_supported(&self) -> bool {
+        !matches!(self, Self::Unsupported)
+    }

Review Comment:
   Are these used anywhere? If not I suggest we keep them out of the public API 
for now.



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -794,6 +822,34 @@ impl FileSource for ParquetSource {
     fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
         self.schema_adapter_factory.clone()
     }
+
+    /// When push down to parquet source of a sort operation is possible,
+    /// create a new ParquetSource with reverse_scan enabled.
+    ///
+    /// # Phase 1 Behavior (Current)
+    /// Returns `Inexact` because we're only reversing the scan direction and 
reordering
+    /// files/row groups. We still need to verify ordering at a higher level.
+    ///
+    /// # Phase 2 (Future)
+    /// Could return `Exact` when we can guarantee that the scan order matches 
the requested order, and
+    /// we can remove any higher-level sort operations.
+    ///
+    /// TODO support more policies in addition to reversing the scan.
+    fn try_pushdown_sort(
+        &self,
+        _order: &[PhysicalSortExpr],
+    ) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn 
FileSource>>> {
+        // Note: We ignore the specific `order` parameter here because the 
decision
+        // about whether we can reverse is made at the FileScanConfig level.
+        // This method creates a reversed version of the current ParquetSource,
+        // and the FileScanConfig will reverse both the file list and the 
declared ordering.
+        let new_source = self.clone().with_reverse_scan_inexact(true);

Review Comment:
   I don't think we should rely on what `FileScanConfig` does.
   I think we should do our own analysis here and return what we are able to do.
   Then `FileScanConfig` should combine that with what it is able to do.



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -490,6 +497,15 @@ impl ParquetSource {
             )),
         }
     }
+
+    pub fn with_reverse_scan_inexact(mut self, reverse_scan_inexact: bool) -> 
Self {
+        self.reverse_scan_inexact = reverse_scan_inexact;
+        self
+    }
+
+    pub fn reverse_scan_inexact(&self) -> bool {
+        self.reverse_scan_inexact
+    }

Review Comment:
   not a huge fan of tying these to the public API.
   
   I would follow the pattern that pushdown of fitlers / projection have and 
propagate the full `try_pushdown_sort` into the `FileSource` layer.
   
   Or if these are just to be used from tests make them `#[cfg(test)]` or 
`pub(crate)`.



##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization (Phase 1)
+//!
+//! Phase 1 supports reverse scan optimization: when the required sort order is
+//! the reverse of the data source's output ordering (or a prefix of it), we 
perform
+//! a reverse scan at the data source level (reading row groups in reverse 
order).
+//!
+//! **Prefix Matching**: If the data has ordering [A DESC, B ASC] and the 
query needs
+//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement.
+//!
+//! This optimization:
+//! 1. Detects SortExec nodes that require a specific ordering
+//! 2. Recursively traverses through transparent nodes to find data sources
+//! 3. Checks if required order is reverse of output order (supports prefix 
matching)
+//! 4. If yes, pushes down reverse scan to data source
+//! 5. Returns **Inexact** ordering (keeps Sort but enables early termination)
+//! 6. Phase 2 will support more complex scenarios (file reordering) and 
detect perfect ordering

Review Comment:
   I think we can rework this to be simpler and more flexible. I will make a PR.



##########
datafusion/datasource/src/source.rs:
##########
@@ -190,6 +190,24 @@ pub trait DataSource: Send + Sync + Debug {
             vec![PushedDown::No; filters.len()],
         ))
     }
+
+    /// Try to create a new DataSource that produces data in the specified 
sort order.
+    ///
+    /// # Arguments
+    /// * `order` - The desired output ordering
+    ///
+    /// # Returns
+    /// * `Ok(Some(source))` - Created a source that satisfies the ordering
+    /// * `Ok(None)` - Cannot optimize for this ordering
+    /// * `Err(e)` - Error occurred
+    ///
+    /// Default implementation returns `Ok(None)`.
+    fn try_pushdown_sort(
+        &self,
+        _order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn DataSource>>> {
+        Ok(None)
+    }

Review Comment:
   Again wondering why not just keep everything consistent using the enum. I 
guess I'll see once I get to teh optimizer rule itself.



##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use 
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 use object_store::ObjectStore;
 
 /// Helper function to convert any type implementing FileSource to Arc&lt;dyn 
FileSource&gt;
 pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn 
FileSource> {
     Arc::new(source)
 }
 
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+    /// The source can guarantee exact ordering (data is perfectly sorted)
+    Exact { inner: T },
+    /// The source has optimized for the ordering but cannot guarantee perfect 
sorting
+    /// (e.g., reordered files/row groups based on statistics)

Review Comment:
   ```suggestion
       /// The source has optimized for the ordering but cannot guarantee 
perfect sorting
       /// (e.g., reordered files/row groups based on statistics).
       /// The upstream sort will be preserved but may still benefit from 
cheaper sorting (e.g. if the data
       /// is mostly sorted) or may be able to terminate early (e.g. if blocks 
of data are clustered om alignment with the sort order).
       /// Crucially this means that this node should be replaced with `inner` 
within its parent.
   ```



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -434,10 +436,90 @@ impl FileOpener for ParquetOpener {
             }
 
             let row_group_indexes = access_plan.row_group_indexes();
-            if let Some(row_selection) =
-                access_plan.into_overall_row_selection(rg_metadata)?
-            {
-                builder = builder.with_row_selection(row_selection);
+
+            // Extract row selection before potentially reversing
+            let row_selection_opt =
+                access_plan.into_overall_row_selection(rg_metadata)?;
+
+            if reverse_scan_inexact {
+                // Reverse the row groups
+                let reversed_indexes: Vec<_> =
+                    row_group_indexes.clone().into_iter().rev().collect();
+
+                // If we have a row selection, we need to rebuild it for the 
reversed order
+                if let Some(row_selection) = row_selection_opt {
+                    // Build a mapping of row group index to its row range in 
the file
+                    let mut rg_row_ranges: Vec<(usize, usize, usize)> = 
Vec::new(); // (rg_index, start_row, end_row)

Review Comment:
   Use `with_capacity()`



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -434,10 +436,90 @@ impl FileOpener for ParquetOpener {
             }
 
             let row_group_indexes = access_plan.row_group_indexes();
-            if let Some(row_selection) =
-                access_plan.into_overall_row_selection(rg_metadata)?
-            {
-                builder = builder.with_row_selection(row_selection);
+
+            // Extract row selection before potentially reversing
+            let row_selection_opt =

Review Comment:
   It would also be great for it to have a lot of unit tests.



##########
datafusion/common/src/config.rs:
##########
@@ -837,6 +837,18 @@ config_namespace! {
         /// writing out already in-memory data, such as from a cached
         /// data frame.
         pub maximum_buffered_record_batches_per_stream: usize, default = 2
+
+        /// Enable sort pushdown optimization for Parquet files.
+        /// When enabled, optimizes queries with ORDER BY:
+        /// - Reordering files based on statistics
+        /// - Reversing row group read order when beneficial

Review Comment:
   There is another thing we can do (please not in this PR haha but just in 
future) which is to re-order row groups / files based on statistics. This would 
again be an imperfect ordering but it would allow us to handle more cases than 
reversing, essentially arbitrary sorts. Obviously if the data is randomly 
distributed this is not helpful. But if it is somewhat clustered it could be 
quite beneficial.



##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -682,6 +684,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ) -> Option<Arc<dyn ExecutionPlan>> {
         None
     }
+
+    /// Try to create a new execution plan that satisfies the given sort 
ordering.
+    ///
+    /// Default implementation returns `Ok(None)`.
+    fn try_pushdown_sort(
+        &self,
+        _order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        Ok(None)
+    }

Review Comment:
   Why wouldn't we use the `Inexact/Exact/Unsupported` structure here?



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -434,10 +436,90 @@ impl FileOpener for ParquetOpener {
             }
 
             let row_group_indexes = access_plan.row_group_indexes();
-            if let Some(row_selection) =
-                access_plan.into_overall_row_selection(rg_metadata)?
-            {
-                builder = builder.with_row_selection(row_selection);
+
+            // Extract row selection before potentially reversing
+            let row_selection_opt =

Review Comment:
   Can this be factored out in `datasource-parquet/src/sort.rs` or something? 
That would also be a good place to add more stuff as we grow the capabilities 
of adjust scan orders.



##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use 
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 use object_store::ObjectStore;
 
 /// Helper function to convert any type implementing FileSource to Arc&lt;dyn 
FileSource&gt;
 pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn 
FileSource> {
     Arc::new(source)
 }
 
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+    /// The source can guarantee exact ordering (data is perfectly sorted)
+    Exact { inner: T },
+    /// The source has optimized for the ordering but cannot guarantee perfect 
sorting
+    /// (e.g., reordered files/row groups based on statistics)
+    Inexact { inner: T },
+    /// The source cannot optimize for this ordering

Review Comment:
   ```suggestion
       /// The source cannot optimize for this ordering.
       /// This case means that the upstream sort should be left intact but 
also that this node
       /// should not be replaced in the plan.
   ```



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -766,6 +767,107 @@ impl DataSource for FileScanConfig {
             }
         }
     }
+
+    fn try_pushdown_sort(
+        &self,
+        order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn DataSource>>> {
+        let current_ordering = match self.output_ordering.first() {
+            Some(ordering) => ordering.as_ref(),
+            None => return Ok(None),
+        };
+
+        // Only support reverse ordering pushdown until now
+        if !is_reverse_ordering(order, current_ordering) {
+            return Ok(None);
+        }
+
+        // Ask the file source if it can handle the sort pushdown
+        let pushdown_result = self.file_source.try_pushdown_sort(order)?;
+
+        let new_file_source = match pushdown_result {
+            SortOrderPushdownResult::Exact { inner }
+            | SortOrderPushdownResult::Inexact { inner } => inner,
+            SortOrderPushdownResult::Unsupported => return Ok(None),
+        };
+
+        let mut new_config = self.clone();
+
+        // Reverse file groups: when scanning in reverse, we need to read files
+        // in reverse order to maintain the correct global ordering
+        new_config.file_groups = new_config
+            .file_groups
+            .into_iter()
+            .map(|group| {
+                let mut files = group.into_inner();
+                files.reverse();
+                files.into()
+            })
+            .collect();
+
+        // Phase 1: DO NOT change output_ordering
+        // The ordering is still the same as before (e.g., ASC) because:
+        // 1. We're only reversing row groups, not rows within groups
+        // 2. This makes the scan "closer" to DESC but not guaranteed
+        // 3. The Sort operator above will still be needed
+        //
+        // Keep the original output_ordering unchanged
+        // new_config.output_ordering = ... (NO CHANGE)
+
+        new_config.file_source = new_file_source;
+
+        Ok(Some(Arc::new(new_config)))
+    }
+}
+
+/// Check if the requested ordering can be satisfied by reversing the current 
ordering.
+///
+/// This function supports **prefix matching**: if the file has ordering [A 
DESC, B ASC]
+/// and we need [A ASC], reversing the scan gives us [A ASC, B DESC], which 
satisfies
+/// the requirement since [A ASC] is a prefix.
+///
+/// # Arguments
+/// * `requested` - The ordering required by the query
+/// * `current` - The natural ordering of the data source (e.g., from file 
metadata)
+///
+/// # Returns
+/// `true` if reversing the current ordering would satisfy the requested 
ordering
+///
+/// # Example
+/// ```text
+/// Current:   [number DESC, letter ASC]
+/// Requested: [number ASC]
+/// Reversed:  [number ASC, letter DESC]  ✓ Prefix match!
+/// ```
+fn is_reverse_ordering(
+    requested: &[PhysicalSortExpr],
+    current: &[PhysicalSortExpr],
+) -> bool {
+    // Allow prefix matching - we can satisfy a prefix of the current ordering
+    // by reversing the scan
+    if requested.len() > current.len() {
+        return false;
+    }
+
+    requested.iter().zip(current.iter()).all(|(req, cur)| {
+        // Check if the expressions are semantically equivalent using 
PhysicalExpr::eq
+        // This is more robust than string comparison as it handles:
+        // - Expression equivalence (not just string representation)
+        // - Complex expressions that might have different string forms but 
same semantics
+        let exprs_match = req.expr.eq(&cur.expr);
+
+        // Now check if the sort options are exactly reversed
+        // For a valid reverse scan:
+        //   - descending must be opposite: ASC ↔ DESC
+        //   - nulls_first must be opposite: NULLS FIRST ↔ NULLS LAST
+        let options_reversed = req.options.descending != cur.options.descending
+            && req.options.nulls_first != cur.options.nulls_first;
+
+        // Both conditions must be true:
+        //   1. Expressions are semantically equivalent
+        //   2. Completely reversed sort options
+        exprs_match && options_reversed
+    })

Review Comment:
   This is very nice and exquisitely documented.



##########
datafusion/core/tests/physical_optimizer/pushdown_sort.rs:
##########
@@ -0,0 +1,778 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for sort pushdown optimizer rule (Phase 1)
+//!
+//! Phase 1 tests verify that:
+//! 1. Reverse scan is enabled (reverse_scan_inexact=true)
+//! 2. SortExec is kept (because ordering is inexact)
+//! 3. output_ordering remains unchanged
+//! 4. Early termination is enabled for TopK queries
+//! 5. Prefix matching works correctly
+
+use arrow::compute::SortOptions;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+
+use crate::physical_optimizer::test_utils::{
+    coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
+    parquet_exec_with_sort, repartition_exec, schema, sort_exec, 
sort_exec_with_fetch,
+    sort_expr, sort_expr_options, OptimizationTest,
+};
+
+#[test]
+fn test_sort_pushdown_disabled() {
+    // When pushdown is disabled, plan should remain unchanged
+    let schema = schema();
+    let source = parquet_exec(schema.clone());
+    let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
+    let plan = sort_exec(sort_exprs, source);
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownSort::new(), false),
+        @r###"
+    OptimizationTest:
+      input:
+        - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+        -   DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+      output:
+        Ok:
+          - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+          -   DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet
+    "###
+    );
+}
+
+#[test]
+fn test_sort_pushdown_basic_phase1() {
+    // Phase 1: Reverse scan enabled, Sort kept, output_ordering unchanged
+    let schema = schema();
+
+    // Source has ASC NULLS LAST ordering (default)
+    let source_ordering = LexOrdering::new(vec![sort_expr("a", 
&schema)]).unwrap();
+    let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+    // Request DESC NULLS LAST ordering (exact reverse)
+    let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+        "a",
+        &schema,
+        SortOptions {
+            descending: true,
+            nulls_first: false,
+        },
+    )])
+    .unwrap();
+    let plan = sort_exec(desc_ordering, source);
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownSort::new(), true),
+        @r###"
+    OptimizationTest:
+      input:
+        - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+        -   DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[a@0 ASC], file_type=parquet
+      output:
+        Ok:
+          - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+          -   DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet, 
reverse_scan_inexact=true

Review Comment:
   Shouldn't output ordering be updated here? Or be made unknown or something? 
If we're reversing the scan it's either `a@0 DESC` or `unknown` (I think it 
should be the latter since we're just shuffling around row groups)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to