martin-g commented on code in PR #19064:
URL: https://github.com/apache/datafusion/pull/19064#discussion_r2584817003


##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization (Phase 1)
+//!
+//! Phase 1 focuses on rearranging files and row groups based on statistics
+//! to provide approximate ordering, enabling early termination for TopK 
queries.
+//!
+//! This optimization:
+//! 1. Detects SortExec nodes that require a specific ordering
+//! 2. Recursively traverses through transparent nodes to find data sources
+//! 3. Pushes the sort requirement down when possible
+//! 4. Returns **Inexact** results (keeps Sort but enables early termination)
+//! 5. Phase 2 todo will detect perfect ordering and remove Sort completely
+
+use crate::{OptimizerContext, PhysicalOptimizerRule};
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::Result;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
+use std::sync::Arc;
+
+/// A PhysicalOptimizerRule that attempts to push down sort requirements to 
data sources.
+///
+/// # Phase 1 Behavior (Current)
+///
+/// This optimization rearranges files and row groups to match query ordering:
+/// - Files are reordered based on their min/max statistics
+/// - Row groups are read in reverse order when appropriate
+/// - Returns **Inexact** ordering (keeps Sort but enables early termination)
+///
+/// Benefits:
+/// - TopK queries (ORDER BY ... LIMIT): 50-80% faster due to early termination
+/// - Range queries: 30-50% improvement from better data locality
+/// - Memory: No additional overhead (only changes read order)
+///
+/// # Phase 2 (Future)
+///
+/// Will detect when files are perfectly sorted and:
+/// - Return **Exact** ordering guarantees
+/// - Completely eliminate the Sort operator
+/// - Provide even better performance
+///
+/// # Implementation
+///
+/// 1. Detects SortExec nodes
+/// 2. Recursively pushes through transparent nodes (CoalesceBatches, 
Repartition, etc.)
+/// 3. Asks data sources to optimize via `try_pushdown_sort()`
+/// 4. Keeps Sort operator (Phase 1 returns Inexact)
+#[derive(Debug, Clone, Default)]
+pub struct PushdownSort;
+
+impl PushdownSort {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for PushdownSort {
+    fn optimize_plan(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: &OptimizerContext,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Check if sort pushdown optimization is enabled
+        let enable_sort_pushdown = context
+            .session_config()
+            .options()
+            .execution
+            .parquet
+            .enable_sort_pushdown;
+
+        // Return early if not enabled
+        if !enable_sort_pushdown {
+            return Ok(plan);
+        }
+
+        // Search for any SortExec nodes and try to optimize them
+        plan.transform_down(&|plan: Arc<dyn ExecutionPlan>| {
+            // First check if this is a GlobalLimitExec -> SortExec pattern
+            // This is important for TopK queries
+            if let Some(limit_exec) = 
plan.as_any().downcast_ref::<GlobalLimitExec>() {
+                if let Some(sort_exec) =
+                    limit_exec.input().as_any().downcast_ref::<SortExec>()
+                {
+                    return optimize_limit_sort(limit_exec, sort_exec);
+                }
+            }
+
+            // Otherwise, check if this is just a SortExec
+            let sort_exec = match plan.as_any().downcast_ref::<SortExec>() {
+                Some(sort_exec) => sort_exec,
+                None => return Ok(Transformed::no(plan)),
+            };
+
+            optimize_sort(sort_exec)
+        })
+        .data()
+    }
+
+    fn name(&self) -> &str {
+        "PushdownSort"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Optimize a SortExec by potentially pushing the sort down to the data source
+fn optimize_sort(sort_exec: &SortExec) -> Result<Transformed<Arc<dyn 
ExecutionPlan>>> {
+    let sort_input = Arc::clone(sort_exec.input());
+    let required_ordering = sort_exec.expr();
+
+    // First, check if the sort is already satisfied by input ordering
+    if let Some(_input_ordering) = sort_input.output_ordering() {
+        let input_eq_properties = sort_input.equivalence_properties();
+
+        if input_eq_properties.ordering_satisfy(required_ordering.clone())? {
+            return remove_unnecessary_sort(sort_exec, sort_input);
+        }
+    }
+
+    // Try to push the sort requirement down to the data source (with 
recursive traversal)
+    if let Some(optimized_input) = try_pushdown_sort(&sort_input, 
required_ordering)? {
+        // Phase 1: Always keep the Sort operator
+        // Even though we optimized the input (reordered files/row groups),
+        // we cannot guarantee perfect ordering due to potential overlaps
+        //
+        // However, this still provides huge benefits:
+        // - TopK queries can terminate early
+        // - Less data needs to be sorted
+        // - Better cache locality
+        return Ok(Transformed::yes(Arc::new(
+            SortExec::new(required_ordering.clone(), optimized_input)
+                .with_fetch(sort_exec.fetch())
+                .with_preserve_partitioning(sort_exec.preserve_partitioning()),
+        )));
+    }
+
+    Ok(Transformed::no(Arc::new(sort_exec.clone())))
+}
+
+/// Handle the GlobalLimitExec -> SortExec pattern
+/// This is critical for TopK query optimization
+fn optimize_limit_sort(
+    limit_exec: &GlobalLimitExec,
+    sort_exec: &SortExec,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    let sort_input = Arc::clone(sort_exec.input());
+    let required_ordering = sort_exec.expr();
+
+    // Check if input is already sorted
+    if let Some(_input_ordering) = sort_input.output_ordering() {
+        let input_eq_properties = sort_input.equivalence_properties();
+        if input_eq_properties.ordering_satisfy(required_ordering.clone())? {
+            // Input is already sorted correctly, remove sort and keep limit
+            return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new(
+                sort_input,
+                limit_exec.skip(),
+                limit_exec.fetch(),
+            ))));
+        }
+    }
+
+    // Try to push down the sort requirement
+    if let Some(optimized_input) = try_pushdown_sort(&sort_input, 
required_ordering)? {
+        // Phase 1: Keep the Sort operator
+        // But add the fetch limit to enable early termination
+        // This is where TopK optimization happens!
+        let total_fetch = limit_exec.skip() + limit_exec.fetch().unwrap_or(0);

Review Comment:
   fetch=None means `Fetch all` 
(https://github.com/zhuqi-lucas/arrow-datafusion/blob/0cfc1fefad9c94cfae26e95a6e1642d9ae6f79b5/datafusion/physical-plan/src/limit.rs#L49)
   
   ```suggestion
           let total_fetch = limit_exec.skip() + 
limit_exec.fetch().unwrap_or(0);
           let total_fetch = match limit_exec.fetch() {
               Some(fetch) => limit_exec.skip().saturating_add(fetch),
               None => return Ok(Transformed::no(...)), // Can't optimize 
unlimited fetch
           };
   ```



##########
datafusion/common/src/config.rs:
##########
@@ -831,6 +831,23 @@ config_namespace! {
         /// writing out already in-memory data, such as from a cached
         /// data frame.
         pub maximum_buffered_record_batches_per_stream: usize, default = 2
+
+       /// Enable sort pushdown optimization for Parquet files.

Review Comment:
   The indentation is not aligned with the previous items. There is one space 
less.



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -667,6 +684,11 @@ impl FileSource for ParquetSource {
 
                 write!(f, "{predicate_string}")?;
 
+                // Add reverse_scan info if enabled
+                if self.reverse_scan_inexact {
+                    writeln!(f, ", reverse_scan_inexact=true")?;

Review Comment:
   ```suggestion
                       write!(f, ", reverse_scan_inexact=true")?;
   ```
   no need to add new line at the end



##########
datafusion/sqllogictest/test_files/information_schema.slt:
##########
@@ -370,6 +371,7 @@ datafusion.execution.parquet.data_pagesize_limit 1048576 
(writing) Sets best eff
 datafusion.execution.parquet.dictionary_enabled true (writing) Sets if 
dictionary encoding is enabled. If NULL, uses default parquet writer setting
 datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets 
best effort maximum dictionary page size, in bytes
 datafusion.execution.parquet.enable_page_index true (reading) If true, reads 
the Parquet data page level metadata (the Page Index), if present, to reduce 
the I/O and number of rows decoded.
+datafusion.execution.parquet.enable_sort_pushdown true Enable sort pushdown 
optimization for Parquet files. When enabled, optimizes queries with ORDER BY: 
- Reordering files based on statistics - Reversing row group read order when 
beneficial Returns **inexact ordering**: Sort operator is kept for correctness, 
but can terminate early for TopK queries (ORDER BY ... LIMIT N), providing huge 
speedup. Memory: No additional overhead (only changes read order). Future TODO: 
Will add option to support detect perfectly sorted data and eliminate Sort 
completely. Default: true

Review Comment:
   ```suggestion
   datafusion.execution.parquet.enable_sort_pushdown true Enable sort pushdown 
optimization for Parquet files. When enabled, optimizes queries with ORDER BY: 
- Reordering files based on statistics - Reversing row group read order when 
beneficial Returns **inexact ordering**: Sort operator is kept for correctness, 
but can terminate early for TopK queries (ORDER BY ... LIMIT N), providing huge 
speedup. Memory: No additional overhead (only changes read order). Future TODO: 
Will add option to support detecting perfectly sorted data and eliminate Sort 
completely. Default: true
   ```



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -425,6 +427,12 @@ impl FileOpener for ParquetOpener {
             }
 
             let row_group_indexes = access_plan.row_group_indexes();
+            let row_group_indexes = if reverse_scan_inexact {
+                row_group_indexes.into_iter().rev().collect::<Vec<_>>()

Review Comment:
   Shouldn't this also reverse the row_selection ?



##########
datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt:
##########
@@ -432,3 +432,109 @@ SET 
datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
 
 statement ok
 SET datafusion.optimizer.enable_dynamic_filter_pushdown = true;
+
+# Test 6: Sort Pushdown for ordered Parquet files
+
+# Create a sorted dataset
+statement ok
+CREATE TABLE sorted_data(id INT, value INT, name VARCHAR) AS VALUES
+(1, 100, 'a'),
+(2, 200, 'b'),
+(3, 300, 'c'),
+(4, 400, 'd'),
+(5, 500, 'e'),
+(6, 600, 'f'),
+(7, 700, 'g'),
+(8, 800, 'h'),
+(9, 900, 'i'),
+(10, 1000, 'j');
+
+# Copy to parquet with sorting
+query I
+COPY (SELECT * FROM sorted_data ORDER BY id ASC)
+TO 'test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet';

Review Comment:
   ```suggestion
   TO 'test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet' 
STORED AS PARQUET;
   ```



##########
datafusion/datasource/src/file.rs:
##########
@@ -34,13 +34,51 @@ use 
datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, Pushe
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 use object_store::ObjectStore;
 
 /// Helper function to convert any type implementing FileSource to Arc&lt;dyn 
FileSource&gt;
 pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn 
FileSource> {
     Arc::new(source)
 }
 
+/// Result of attempting to push down sort ordering to a file source
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+    /// The source can guarantee exact ordering (data is perfectly sorted)
+    Exact { inner: T },
+    /// The source has optimized for the ordering but cannot guarantee perfect 
sorting
+    /// (e.g., reordered files/row groups based on statistics)
+    Inexact { inner: T },
+    /// The source cannot optimize for this ordering
+    Unsupported,
+}
+
+impl<T> SortOrderPushdownResult<T> {
+    /// Returns true if the result is Exact
+    pub fn is_exact(&self) -> bool {
+        matches!(self, Self::Exact { .. })
+    }
+
+    /// Returns true if the result is Inexact
+    pub fn is_inexact(&self) -> bool {
+        matches!(self, Self::Inexact { .. })
+    }
+
+    /// Returns true if optimization was successful (Exact or Inexact)
+    pub fn is_supported(&self) -> bool {
+        !matches!(self, Self::Unsupported)
+    }
+
+    /// Extract the inner value if present

Review Comment:
   ```suggestion
       /// Extract the inner value if present
       #[must_use]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to