alamb commented on code in PR #18817:
URL: https://github.com/apache/datafusion/pull/18817#discussion_r2568121671


##########
datafusion/physical-optimizer/src/optimizer.rs:
##########
@@ -145,11 +146,9 @@ impl PhysicalOptimizer {
             // are not present, the load of executors such as join or union 
will be
             // reduced by narrowing their input tables.
             Arc::new(ProjectionPushdown::new()),
-            Arc::new(EnsureCooperative::new()),
-            // This FilterPushdown handles dynamic filters that may have 
references to the source ExecutionPlan.
-            // Therefore it should be run at the end of the optimization 
process since any changes to the plan may break the dynamic filter's references.
-            // See `FilterPushdownPhase` for more details.
-            Arc::new(FilterPushdown::new_post_optimization()),
+            // ReverseOrder: Detect DESC sorts that can use reverse scan
+            // This marks reverse_scan=true on DataSourceExec
+            Arc::new(PushdownSort::new()),
             // The SanityCheckPlan rule checks whether the order and

Review Comment:
   this comment I think needs to be moved down too (it is no longer with the 
sanity check plan implementation)



##########
datafusion/datasource/src/file.rs:
##########
@@ -126,6 +127,29 @@ pub trait FileSource: Send + Sync {
         ))
     }
 
+    /// Try to create a new FileSource that can produce data in the specified 
sort order.
+    ///
+    /// This allows file format implementations to optimize based on the 
required sort order.

Review Comment:
   The ability to push down sorts into the file source has come up before, 
specifically
   - https://github.com/apache/datafusion/issues/10433



##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,1235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization
+//!
+//! This optimizer attempts to push sort requirements down to data sources 
that can
+//! satisfy them natively, avoiding expensive sort operations.
+//!
+//! Currently supported optimizations:
+//! - **Reverse scan**: If a data source naturally produces data in DESC order 
and
+//!   we need ASC (or vice versa), we can reverse the scan direction instead of
+//!   adding a SortExec node.
+//!
+//! Future optimizations could include:
+//! - Reordering row groups in Parquet files
+//! - Leveraging native indexes
+//! - Reordering files in multi-file scans
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::Result;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion_physical_plan::sorts::sort::SortExec;
+use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
+use std::sync::Arc;
+
+/// A PhysicalOptimizerRule that attempts to push down sort requirements to 
data sources
+/// that can natively handle them (e.g., by reversing scan direction).
+///
+/// This optimization:
+/// 1. Detects SortExec nodes that require a specific ordering
+/// 2. Checks if the input can satisfy the ordering by reversing its scan 
direction
+/// 3. Pushes the sort requirement down to the data source when possible
+/// 4. Removes unnecessary sort operations when the input already satisfies 
the requirement
+#[derive(Debug, Clone, Default)]
+pub struct PushdownSort;
+
+impl PushdownSort {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for PushdownSort {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Check if sort pushdown optimization is enabled
+        let enable_sort_pushdown = 
config.execution.parquet.enable_sort_pushdown;
+
+        // Return early if not enabled
+        if !enable_sort_pushdown {
+            return Ok(plan);
+        }
+
+        // Search for any SortExec nodes and try to optimize them
+        plan.transform_up(&|plan: Arc<dyn ExecutionPlan>| {
+            // First check if this is a GlobalLimitExec -> SortExec pattern
+            if let Some(limit_exec) = 
plan.as_any().downcast_ref::<GlobalLimitExec>() {
+                if let Some(sort_exec) =
+                    limit_exec.input().as_any().downcast_ref::<SortExec>()
+                {
+                    return optimize_limit_sort(limit_exec, sort_exec);
+                }
+            }
+
+            // Otherwise, check if this is just a SortExec
+            let sort_exec = match plan.as_any().downcast_ref::<SortExec>() {
+                Some(sort_exec) => sort_exec,
+                None => return Ok(Transformed::no(plan)),
+            };
+
+            optimize_sort(sort_exec)
+        })
+        .data()
+    }
+
+    fn name(&self) -> &str {
+        "PushdownSort"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Optimize a SortExec by potentially pushing the sort down to the data source
+fn optimize_sort(sort_exec: &SortExec) -> Result<Transformed<Arc<dyn 
ExecutionPlan>>> {
+    let sort_input = Arc::clone(sort_exec.input());
+    let required_ordering = sort_exec.expr();
+
+    // First, check if the sort is already satisfied by input ordering
+    if let Some(_input_ordering) = sort_input.output_ordering() {
+        let input_eq_properties = sort_input.equivalence_properties();
+
+        if input_eq_properties.ordering_satisfy(required_ordering.clone())? {
+            return remove_unnecessary_sort(sort_exec, sort_input);
+        }
+    }
+
+    // Try to push the sort requirement down to the data source
+    if let Some(optimized_input) = try_pushdown_sort(&sort_input, 
required_ordering)? {
+        // Verify that the optimized input satisfies the required ordering
+        if optimized_input
+            .equivalence_properties()
+            .ordering_satisfy(required_ordering.clone())?
+        {
+            return remove_unnecessary_sort(sort_exec, optimized_input);
+        }
+
+        // If not fully satisfied, keep the sort but with optimized input
+        return Ok(Transformed::yes(Arc::new(
+            SortExec::new(required_ordering.clone(), optimized_input)
+                .with_fetch(sort_exec.fetch())
+                .with_preserve_partitioning(sort_exec.preserve_partitioning()),
+        )));
+    }
+
+    Ok(Transformed::no(Arc::new(sort_exec.clone())))
+}
+
+/// Handle the GlobalLimitExec -> SortExec pattern
+fn optimize_limit_sort(
+    limit_exec: &GlobalLimitExec,
+    sort_exec: &SortExec,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    let sort_input = Arc::clone(sort_exec.input());
+    let required_ordering = sort_exec.expr();
+
+    // Check if input is already sorted
+    if let Some(_input_ordering) = sort_input.output_ordering() {
+        let input_eq_properties = sort_input.equivalence_properties();
+        if input_eq_properties.ordering_satisfy(required_ordering.clone())? {
+            // Input is already sorted correctly, remove sort and keep limit
+            return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                sort_input,
+                limit_exec.skip(),
+                limit_exec.fetch(),
+            ))));
+        }
+    }
+
+    // Try to push down the sort requirement
+    if let Some(optimized_input) = try_pushdown_sort(&sort_input, 
required_ordering)? {
+        if optimized_input
+            .equivalence_properties()
+            .ordering_satisfy(required_ordering.clone())?
+        {
+            // Successfully pushed down sort, now handle the limit
+            let total_fetch = limit_exec.skip() + 
limit_exec.fetch().unwrap_or(0);
+
+            // Try to push limit down as well if the source supports it
+            if let Some(with_fetch) = 
optimized_input.with_fetch(Some(total_fetch)) {
+                if limit_exec.skip() > 0 {
+                    return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                        with_fetch,
+                        limit_exec.skip(),
+                        limit_exec.fetch(),
+                    ))));
+                } else {
+                    return Ok(Transformed::yes(with_fetch));
+                }
+            }
+
+            return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                optimized_input,
+                limit_exec.skip(),
+                limit_exec.fetch(),
+            ))));
+        }
+    }
+
+    // Can't optimize, return original pattern
+    Ok(Transformed::no(Arc::new(GlobalLimitExec::new(
+        Arc::new(sort_exec.clone()),
+        limit_exec.skip(),
+        limit_exec.fetch(),
+    ))))
+}
+
+/// Remove unnecessary sort based on the logic from 
EnforceSorting::analyze_immediate_sort_removal
+fn remove_unnecessary_sort(
+    sort_exec: &SortExec,
+    sort_input: Arc<dyn ExecutionPlan>,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    let new_plan = if !sort_exec.preserve_partitioning()
+        && sort_input.output_partitioning().partition_count() > 1
+    {
+        // Replace the sort with a sort-preserving merge
+        Arc::new(
+            SortPreservingMergeExec::new(sort_exec.expr().clone(), sort_input)
+                .with_fetch(sort_exec.fetch()),
+        ) as _
+    } else {
+        // Remove the sort entirely
+        if let Some(fetch) = sort_exec.fetch() {
+            // If the sort has a fetch, add a limit instead
+            if sort_input.output_partitioning().partition_count() == 1 {
+                // Try to push the limit down to the source
+                if let Some(with_fetch) = sort_input.with_fetch(Some(fetch)) {
+                    return Ok(Transformed::yes(with_fetch));
+                }
+                Arc::new(GlobalLimitExec::new(sort_input, 0, Some(fetch)))
+                    as Arc<dyn ExecutionPlan>
+            } else {
+                Arc::new(LocalLimitExec::new(sort_input, fetch)) as Arc<dyn 
ExecutionPlan>
+            }
+        } else {
+            sort_input
+        }
+    };
+
+    Ok(Transformed::yes(new_plan))
+}
+
+/// Try to push down a sort requirement to an execution plan
+fn try_pushdown_sort(
+    plan: &Arc<dyn ExecutionPlan>,
+    required_ordering: &[PhysicalSortExpr],
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    // Check if the plan can natively handle the sort requirement
+    if let Some(data_source_exec) = 
plan.as_any().downcast_ref::<DataSourceExec>() {
+        return data_source_exec.try_pushdown_sort(required_ordering);
+    }
+
+    Ok(None)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Int32Array, RecordBatch};
+    use arrow::compute::SortOptions;
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr::Partitioning;
+    use datafusion_physical_expr_common::sort_expr::LexOrdering;
+    use datafusion_physical_plan::empty::EmptyExec;
+    use datafusion_physical_plan::memory::LazyMemoryExec;
+    use datafusion_physical_plan::repartition::RepartitionExec;
+    use parking_lot::RwLock;
+    use std::fmt;
+
+    /// Helper function to create a test schema with three columns
+    fn create_test_schema() -> Arc<Schema> {
+        Arc::new(Schema::new(vec![
+            Field::new("col1", DataType::Int32, true),
+            Field::new("col2", DataType::Utf8, true),
+            Field::new("col3", DataType::Float64, true),
+        ]))
+    }
+
+    /// Test batch generator that produces sorted data
+    #[derive(Debug, Clone)]
+    struct SortedBatchGenerator {
+        schema: SchemaRef,
+        batches_generated: usize,
+        max_batches: usize,
+        ascending: bool,
+    }
+
+    impl fmt::Display for SortedBatchGenerator {
+        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+            write!(
+                f,
+                "SortedBatchGenerator(batches_generated={}, max_batches={}, 
ascending={})",
+                self.batches_generated, self.max_batches, self.ascending
+            )
+        }
+    }
+
+    impl datafusion_physical_plan::memory::LazyBatchGenerator for 
SortedBatchGenerator {

Review Comment:
   most other optimizer tests don't actually run plans, instead they construct 
a plan, run the rule, and verify the transformation is as expected. 
   
   For example, here are the tests for filter_pushdown (note this is in 
`core/tests`)
   
   
https://github.com/apache/datafusion/blob/main/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs



##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,1446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{
+    Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter,
+};
+use datafusion_common::Result;
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::{FileScanConfig, 
FileScanConfigBuilder};
+use datafusion_datasource::source::{DataSource, DataSourceExec};
+use datafusion_datasource_parquet::source::ParquetSource;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_plan::joins::SortMergeJoinExec;
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion_physical_plan::sorts::sort::SortExec;
+use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
+use std::sync::Arc;
+
+/// A PhysicalOptimizerRule that attempts to reverse a SortExec's input if 
doing so would make the
+/// input ordering compatible with the SortExec's required output ordering.
+/// It also removes unnecessary sorts when the input already satisfies the 
required ordering.
+#[derive(Debug, Clone, Default)]
+pub struct ReverseOrder;
+
+impl ReverseOrder {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for ReverseOrder {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Check if reverse scan optimization is enabled
+        let enable_reverse_scan = config.execution.parquet.enable_reverse_scan;
+
+        // Return early if not enabled
+        if !enable_reverse_scan {
+            return Ok(plan);
+        }
+
+        // Search for any SortExec nodes and try to optimize them
+        plan.transform_up(&|plan: Arc<dyn ExecutionPlan>| {
+            // First check if this is a GlobalLimitExec -> SortExec pattern
+            if let Some(limit_exec) = 
plan.as_any().downcast_ref::<GlobalLimitExec>() {
+                if let Some(sort_exec) =
+                    limit_exec.input().as_any().downcast_ref::<SortExec>()
+                {
+                    return optimize_limit_sort(limit_exec, sort_exec);
+                }
+            }
+
+            // Otherwise, check if this is just a SortExec
+            let sort_exec = match plan.as_any().downcast_ref::<SortExec>() {
+                Some(sort_exec) => sort_exec,
+                None => return Ok(Transformed::no(plan)),
+            };
+
+            let sort_input: Arc<dyn ExecutionPlan> = 
Arc::clone(sort_exec.input());
+
+            // First, check if the sort is already satisfied by input ordering
+            if let Some(_input_ordering) = sort_input.output_ordering() {
+                let input_eq_properties = sort_input.equivalence_properties();
+
+                // Check if input already satisfies the sort requirement
+                if 
input_eq_properties.ordering_satisfy(sort_exec.expr().clone())? {
+                    return remove_unnecessary_sort(sort_exec, sort_input);
+                }
+            }
+
+            // If not satisfied, try to reverse the input
+            let reversed_eq_properties = {
+                let mut new = 
sort_input.properties().equivalence_properties().clone();
+                new.clear_orderings();
+
+                // Build reversed sort exprs for each ordering class
+                let reversed_orderings = sort_input
+                    .equivalence_properties()
+                    .oeq_class()
+                    .iter()
+                    .map(|ordering| {
+                        ordering
+                            .iter()
+                            .map(|expr| expr.reverse())
+                            .collect::<Vec<_>>()
+                    })
+                    .collect::<Vec<_>>();
+
+                new.add_orderings(reversed_orderings);
+                new
+            };
+
+            match 
reversed_eq_properties.ordering_satisfy(sort_exec.expr().clone())? {
+                true => {
+                    // Reverse the input and then remove the sort
+                    let reversed_input =
+                        sort_input.rewrite(&mut ReverseRewriter).unwrap().data;
+
+                    // After reversing, check if we can remove the sort
+                    if reversed_input
+                        .equivalence_properties()
+                        .ordering_satisfy(sort_exec.expr().clone())?
+                    {
+                        remove_unnecessary_sort(sort_exec, reversed_input)
+                    } else {
+                        // Keep the sort but with reversed input
+                        Ok(Transformed::yes(Arc::new(
+                            SortExec::new(sort_exec.expr().clone(), 
reversed_input)
+                                .with_fetch(sort_exec.fetch())
+                                .with_preserve_partitioning(
+                                    sort_exec.preserve_partitioning(),
+                                ),
+                        )))
+                    }
+                }
+                false => Ok(Transformed::no(plan)),
+            }
+        })
+        .data()
+    }
+
+    fn name(&self) -> &str {
+        "ReverseOrder"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Handle the GlobalLimitExec -> SortExec pattern
+fn optimize_limit_sort(
+    limit_exec: &GlobalLimitExec,
+    sort_exec: &SortExec,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    let sort_input = Arc::clone(sort_exec.input());
+
+    // Check if input is already sorted
+    if let Some(_input_ordering) = sort_input.output_ordering() {
+        let input_eq_properties = sort_input.equivalence_properties();
+        if input_eq_properties.ordering_satisfy(sort_exec.expr().clone())? {
+            // Input is already sorted correctly, remove sort and keep limit
+            return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                sort_input,
+                limit_exec.skip(),
+                limit_exec.fetch(),
+            ))));
+        }
+    }
+
+    // Check if we can reverse the input to satisfy the sort
+    let reversed_eq_properties = {
+        let mut new = sort_input.properties().equivalence_properties().clone();
+        new.clear_orderings();
+
+        let reversed_orderings = sort_input
+            .equivalence_properties()
+            .oeq_class()
+            .iter()
+            .map(|ordering| {
+                ordering
+                    .iter()
+                    .map(|expr| expr.reverse())
+                    .collect::<Vec<_>>()
+            })
+            .collect::<Vec<_>>();
+
+        new.add_orderings(reversed_orderings);
+        new
+    };
+
+    if reversed_eq_properties.ordering_satisfy(sort_exec.expr().clone())? {
+        // Can reverse! Apply reversal
+        let reversed_input = sort_input.rewrite(&mut 
ReverseRewriter).unwrap().data;
+
+        // Check if reversed input satisfies the sort requirement
+        if reversed_input
+            .equivalence_properties()
+            .ordering_satisfy(sort_exec.expr().clone())?
+        {
+            // Check if this is a single-partition DataSourceExec with 
reverse_scan enabled
+            // In that case, the limit is already handled internally by 
ReversedParquetStreamWithLimit
+            if is_single_partition_reverse_scan_datasource(&reversed_input) {
+                let total_fetch = limit_exec.skip() + 
limit_exec.fetch().unwrap_or(0);
+
+                if let Some(with_fetch) = 
reversed_input.with_fetch(Some(total_fetch)) {
+                    if limit_exec.skip() > 0 {
+                        return 
Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                            with_fetch,
+                            limit_exec.skip(),
+                            limit_exec.fetch(),
+                        ))));
+                    } else {
+                        return Ok(Transformed::yes(with_fetch));
+                    }
+                }
+
+                return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                    reversed_input,
+                    limit_exec.skip(),
+                    limit_exec.fetch(),
+                ))));
+            }
+
+            // Otherwise, remove sort but keep limit with reversed input
+            return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                reversed_input,
+                limit_exec.skip(),
+                limit_exec.fetch(),
+            ))));
+        }
+    }
+
+    // Can't optimize, return original pattern
+    Ok(Transformed::no(Arc::new(GlobalLimitExec::new(
+        Arc::new(sort_exec.clone()),
+        limit_exec.skip(),
+        limit_exec.fetch(),
+    ))))
+}
+
+/// Check if the plan is a single-partition DataSourceExec with reverse_scan 
enabled
+fn is_single_partition_reverse_scan_datasource(plan: &Arc<dyn ExecutionPlan>) 
-> bool {
+    // Only optimize for single partition
+    if plan.output_partitioning().partition_count() != 1 {
+        return false;
+    }
+
+    if let Some(data_source_exec) = 
plan.as_any().downcast_ref::<DataSourceExec>() {
+        if let Some(scan_config) = data_source_exec
+            .data_source()
+            .as_any()
+            .downcast_ref::<FileScanConfig>()
+        {
+            if let Some(parquet_source) = scan_config
+                .file_source
+                .as_any()
+                .downcast_ref::<ParquetSource>()
+            {
+                return parquet_source.reverse_scan();
+            }
+        }
+    }
+    false
+}
+
+/// Remove unnecessary sort based on the logic from 
EnforceSorting::analyze_immediate_sort_removal
+fn remove_unnecessary_sort(

Review Comment:
   `enforce_sorting` is a somewhat triky tricky rule because it does both:
   1. Adds new `SortExec`s to ensure that the 
`ExecutionPlan::input_requirements` are met
   2. Pushes down / eliminates unecessary sorts
   
   I think it would be a much cleaner design (though it would be a **major** 
refactoring) if we split that up into two passes -- add necessary sorts and 
then optimizations to remove / push sorts



##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,1235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization
+//!
+//! This optimizer attempts to push sort requirements down to data sources 
that can
+//! satisfy them natively, avoiding expensive sort operations.
+//!
+//! Currently supported optimizations:
+//! - **Reverse scan**: If a data source naturally produces data in DESC order 
and
+//!   we need ASC (or vice versa), we can reverse the scan direction instead of
+//!   adding a SortExec node.
+//!
+//! Future optimizations could include:
+//! - Reordering row groups in Parquet files
+//! - Leveraging native indexes
+//! - Reordering files in multi-file scans
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::Result;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion_physical_plan::sorts::sort::SortExec;
+use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
+use std::sync::Arc;
+
+/// A PhysicalOptimizerRule that attempts to push down sort requirements to 
data sources
+/// that can natively handle them (e.g., by reversing scan direction).
+///
+/// This optimization:
+/// 1. Detects SortExec nodes that require a specific ordering
+/// 2. Checks if the input can satisfy the ordering by reversing its scan 
direction
+/// 3. Pushes the sort requirement down to the data source when possible
+/// 4. Removes unnecessary sort operations when the input already satisfies 
the requirement
+#[derive(Debug, Clone, Default)]
+pub struct PushdownSort;
+
+impl PushdownSort {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for PushdownSort {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Check if sort pushdown optimization is enabled
+        let enable_sort_pushdown = 
config.execution.parquet.enable_sort_pushdown;
+
+        // Return early if not enabled
+        if !enable_sort_pushdown {
+            return Ok(plan);
+        }
+
+        // Search for any SortExec nodes and try to optimize them
+        plan.transform_up(&|plan: Arc<dyn ExecutionPlan>| {

Review Comment:
   should this be `transform_down` so it pushes the sorts down (rather than up)?
   
   Or is the expectation that this is only going to work on sorts that are 
directly above the datasource?



##########
datafusion/physical-optimizer/Cargo.toml:
##########
@@ -43,6 +43,7 @@ recursive_protection = ["dep:recursive"]
 [dependencies]
 arrow = { workspace = true }
 datafusion-common = { workspace = true }
+datafusion-datasource = { workspace = true }

Review Comment:
   Do we really need to add this new dependncy? I think if you made the 
optimizer tests look more like the other optimizzer tests this wouldn't be 
needed



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -718,6 +717,101 @@ impl DataSource for FileScanConfig {
             }
         }
     }
+
+    fn try_pushdown_sort(
+        &self,
+        order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn DataSource>>> {
+        let current_ordering = match self.output_ordering.first() {
+            Some(ordering) => ordering.as_ref(),
+            None => return Ok(None),
+        };
+
+        // Only support reverse ordering pushdown
+        if !is_reverse_ordering(order, current_ordering) {
+            return Ok(None);
+        }
+
+        // Ask the file source if it can handle the sort pushdown
+        // (e.g., ParquetSource will enable reverse_scan)
+        let new_file_source = match self.file_source.try_pushdown_sort(order)? 
{
+            Some(source) => source,
+            None => return Ok(None),
+        };
+
+        let mut new_config = self.clone();
+
+        // Reverse file groups: when scanning in reverse, we need to read files
+        // in reverse order to maintain the correct global ordering
+        new_config.file_groups = new_config
+            .file_groups
+            .into_iter()
+            .map(|group| {
+                let mut files = group.into_inner();
+                files.reverse();
+                files.into()
+            })
+            .collect();
+
+        // Build the new output ordering by reversing each sort expression's 
direction
+        // E.g., [number DESC] becomes [number ASC]
+        let mut reversed_ordering = Vec::new();
+        for sort_expr in current_ordering {
+            reversed_ordering.push(PhysicalSortExpr {
+                expr: Arc::clone(&sort_expr.expr),
+                options: !sort_expr.options,
+            });
+        }
+
+        new_config.output_ordering = vec![LexOrdering::new(reversed_ordering)
+            .ok_or_else(|| {
+                DataFusionError::Plan(
+                    "Failed to create ordering: invalid sort 
expressions".to_string(),
+                )
+            })?];
+
+        new_config.file_source = new_file_source;
+
+        Ok(Some(Arc::new(new_config)))
+    }
+}
+
+/// Check if the requested ordering can be satisfied by reversing the current 
ordering.
+///
+/// This function supports **prefix matching**: if the file has ordering [A 
DESC, B ASC]
+/// and we need [A ASC], reversing the scan gives us [A ASC, B DESC], which 
satisfies
+/// the requirement since [A ASC] is a prefix.
+///
+/// # Arguments
+/// * `requested` - The ordering required by the query
+/// * `current` - The natural ordering of the data source (e.g., from file 
metadata)
+///
+/// # Returns
+/// `true` if reversing the current ordering would satisfy the requested 
ordering
+///
+/// # Example
+/// ```text
+/// Current:   [number DESC, letter ASC]
+/// Requested: [number ASC]
+/// Reversed:  [number ASC, letter DESC]  ✓ Prefix match!
+/// ```
+fn is_reverse_ordering(
+    requested: &[PhysicalSortExpr],
+    current: &[PhysicalSortExpr],
+) -> bool {
+    // Allow prefix matching - we can satisfy a prefix of the current ordering
+    // by reversing the scan
+    if requested.len() > current.len() {
+        return false;
+    }
+
+    requested.iter().zip(current.iter()).all(|(req, cur)| {
+        let exprs_match = req.expr.to_string() == cur.expr.to_string();

Review Comment:
   is there a reason not to compare the sort exprs directly? I think converting 
them to a string may lose some information (like would aliased expressions 
appear equal when they really arent?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to