alamb commented on code in PR #19064:
URL: https://github.com/apache/datafusion/pull/19064#discussion_r2615261113
##########
datafusion/common/src/config.rs:
##########
@@ -837,6 +837,18 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2
+
+ /// Enable sort pushdown optimization for Parquet files.
+ /// When enabled, optimizes queries with ORDER BY:
+ /// - Reordering files based on statistics
+ /// - Reversing row group read order when beneficial
+ /// Returns **inexact ordering**: Sort operator is kept for
correctness,
+ /// but can terminate early for TopK queries (ORDER BY ... LIMIT N),
+ /// providing huge speedup.
+ /// Memory: No additional overhead (only changes read order).
+ /// Future TODO: Will add option to support detecting perfectly sorted
data and eliminate Sort completely.
+ /// Default: true
+ pub enable_sort_pushdown: bool, default = true
Review Comment:
I suggest having individual options for the individual optimizations -- so
in this case "enable_reverse_row_groups"
##########
datafusion/core/tests/physical_optimizer/test_utils.rs:
##########
@@ -699,3 +701,75 @@ impl TestAggregate {
}
}
}
+
+/// A harness for testing physical optimizers.
Review Comment:
👍
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -112,6 +113,7 @@ pub(super) struct ParquetOpener {
/// Maximum size of the predicate cache, in bytes. If none, uses
/// the arrow-rs default.
pub max_predicate_cache_size: Option<usize>,
+ pub reverse_scan_inexact: bool,
Review Comment:
Perhaps some comments would help -- specifically on what is inexact
##########
datafusion/common/src/config.rs:
##########
@@ -837,6 +837,18 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2
+
+ /// Enable sort pushdown optimization for Parquet files.
+ /// When enabled, optimizes queries with ORDER BY:
+ /// - Reordering files based on statistics
+ /// - Reversing row group read order when beneficial
+ /// Returns **inexact ordering**: Sort operator is kept for
correctness,
+ /// but can terminate early for TopK queries (ORDER BY ... LIMIT N),
+ /// providing huge speedup.
+ /// Memory: No additional overhead (only changes read order).
+ /// Future TODO: Will add option to support detecting perfectly sorted
data and eliminate Sort completely.
Review Comment:
BTW since parquet files themselves have no way to communicate sort order (I
think the pre-existing ordering comes from the LIstingTable level), I would
expect the option for eliminating sorts to not be on the parquet options 🤔
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1344,4 +1397,268 @@ mod test {
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
+
+ #[tokio::test]
+ async fn test_reverse_scan_row_groups() {
+ use parquet::file::properties::WriterProperties;
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create multiple batches to ensure multiple row groups
+ let batch1 =
+ record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(3)])).unwrap();
+ let batch2 =
+ record_batch!(("a", Int32, vec![Some(4), Some(5),
Some(6)])).unwrap();
+ let batch3 =
+ record_batch!(("a", Int32, vec![Some(7), Some(8),
Some(9)])).unwrap();
+
+ // Write parquet file with multiple row groups
+ // Force small row groups by setting max_row_group_size
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(3) // Force each batch into its own row
group
+ .build();
+
+ let mut out = BytesMut::new().writer();
Review Comment:
I wonder if we can factor this into a function too (to avoid the repetition
for creating parquet files)
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -844,6 +845,114 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
+ let current_ordering = match self.output_ordering.first() {
+ Some(ordering) => ordering.as_ref(),
+ None => return Ok(SortOrderPushdownResult::Unsupported),
+ };
+
+ // Only support reverse ordering pushdown until now
+ if !is_reverse_ordering(order, current_ordering) {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+
+ // Ask the file source if it can handle the sort pushdown
+ let pushdown_result = self.file_source.try_pushdown_sort(order)?;
+
+ // Extract the new file source and determine result type
+ let (new_file_source, is_exact) = match pushdown_result {
+ SortOrderPushdownResult::Exact { inner } => (inner, true),
+ SortOrderPushdownResult::Inexact { inner } => (inner, false),
+ SortOrderPushdownResult::Unsupported => {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+ };
+
+ let mut new_config = self.clone();
+
+ // Reverse file groups: when scanning in reverse, we need to read files
+ // in reverse order to maintain the correct global ordering
+ new_config.file_groups = new_config
+ .file_groups
+ .into_iter()
+ .map(|group| {
+ let mut files = group.into_inner();
+ files.reverse();
+ files.into()
+ })
+ .collect();
+
+ // Phase 1: Make output_ordering unknown since we're only reversing
row groups,
Review Comment:
I think this should be a function of `is_exact` -- `is_exact` is returned,
then the ordering can be reversed, otherwise it can not be
##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -284,6 +286,47 @@ impl ExecutionPlan for CoalescePartitionsExec {
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
+ // CoalescePartitionsExec merges multiple partitions into one, which
loses
+ // global ordering. However, we can still push the sort requirement
down
+ // to optimize individual partitions - the Sort operator above will
handle
+ // the global ordering.
+ //
+ // Note: The result will always be at most Inexact (never Exact) when
there
+ // are multiple partitions, because merging destroys global ordering.
+ let result = self.input.try_pushdown_sort(order)?;
+
+ // If we have multiple partitions, we can't return Exact even if the
+ // underlying source claims Exact - merging destroys global ordering
+ let has_multiple_partitions =
+ self.input.output_partitioning().partition_count() > 1;
+
+ result
+ .try_map(|new_input| {
+ Ok(
+ Arc::new(
+
CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
+ ) as Arc<dyn ExecutionPlan>,
+ )
+ })
+ .map(|r| {
+ if has_multiple_partitions {
+ // Downgrade Exact to Inexact when merging multiple
partitions
Review Comment:
This might be a nice method to add to `SortOrderPushdownResult`, like
```rust
r.into_inexact()
```
##########
datafusion/datasource/src/source.rs:
##########
@@ -190,6 +191,25 @@ pub trait DataSource: Send + Sync + Debug {
vec![PushedDown::No; filters.len()],
))
}
+
+ /// Try to create a new DataSource that produces data in the specified
sort order.
+ ///
+ /// # Arguments
+ /// * `order` - The desired output ordering
+ ///
+ /// # Returns
+ /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that
guarantees exact ordering
+ /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source
optimized for the ordering
+ /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for
this ordering
+ /// * `Err(e)` - Error occurred
+ ///
+ /// Default implementation returns `Unsupported`.
+ fn try_pushdown_sort(
Review Comment:
As above, I don't think we should add try_pushdown_sort here (I recommend
calling this try_reverse)
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -844,6 +845,114 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
Review Comment:
This API is 👍
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -112,6 +113,7 @@ pub(super) struct ParquetOpener {
/// Maximum size of the predicate cache, in bytes. If none, uses
/// the arrow-rs default.
pub max_predicate_cache_size: Option<usize>,
+ pub reverse_scan_inexact: bool,
Review Comment:
Actually, after reading this more I think a more accurate name for this flag
would be `reverse_row_groups` or something to make it clear it just controls
which row groups are reversed
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -434,10 +436,90 @@ impl FileOpener for ParquetOpener {
}
let row_group_indexes = access_plan.row_group_indexes();
- if let Some(row_selection) =
- access_plan.into_overall_row_selection(rg_metadata)?
- {
- builder = builder.with_row_selection(row_selection);
+
+ // Extract row selection before potentially reversing
+ let row_selection_opt =
Review Comment:
We could potentially simplify this code a bit more like this (and ensure we
don't change anything if reverse_scan_inexact is not specified
```rust
let mut access_plan = ...
if reverse_scan_inexact {
access_plan = reverse_acess_plan(access_plan)
}
```
##########
datafusion/datasource-parquet/src/sort.rs:
##########
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort-related utilities for Parquet scanning
+
+use datafusion_common::Result;
+use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+use parquet::file::metadata::ParquetMetaData;
+use std::collections::HashMap;
+
+/// Reverse a row selection to match reversed row group order.
+///
+/// When scanning row groups in reverse order, we need to adjust the row
selection
+/// to account for the new ordering. This function:
+/// 1. Maps each selection to its corresponding row group
+/// 2. Reverses the order of row groups
+/// 3. Reconstructs the row selection for the new order
+///
+/// # Arguments
+/// * `row_selection` - Original row selection
+/// * `parquet_metadata` - Metadata containing row group information
+///
+/// # Returns
+/// A new `RowSelection` adjusted for reversed row group order
+pub fn reverse_row_selection(
+ row_selection: &RowSelection,
+ parquet_metadata: &ParquetMetaData,
+) -> Result<RowSelection> {
+ let rg_metadata = parquet_metadata.row_groups();
+
+ // Build a mapping of row group index to its row range in the file
+ let mut rg_row_ranges: Vec<(usize, usize, usize)> =
+ Vec::with_capacity(rg_metadata.len());
+ let mut current_row = 0;
+ for (rg_idx, rg) in rg_metadata.iter().enumerate() {
+ let num_rows = rg.num_rows() as usize;
+ rg_row_ranges.push((rg_idx, current_row, current_row + num_rows));
+ current_row += num_rows;
+ }
+
+ // Map selections to row groups
+ let mut rg_selections: HashMap<usize, Vec<RowSelector>> = HashMap::new();
+
+ let mut current_file_row = 0;
+ for selector in row_selection.iter() {
+ let selector_end = current_file_row + selector.row_count;
+
+ // Find which row groups this selector spans
+ for (rg_idx, rg_start, rg_end) in rg_row_ranges.iter() {
+ if current_file_row < *rg_end && selector_end > *rg_start {
+ // This selector overlaps with this row group
+ let overlap_start = current_file_row.max(*rg_start);
+ let overlap_end = selector_end.min(*rg_end);
+ let overlap_count = overlap_end - overlap_start;
+
+ if overlap_count > 0 {
+ let entry = rg_selections.entry(*rg_idx).or_default();
+ if selector.skip {
+ entry.push(RowSelector::skip(overlap_count));
+ } else {
+ entry.push(RowSelector::select(overlap_count));
+ }
+ }
+ }
+ }
+
+ current_file_row = selector_end;
+ }
+
+ // Build new selection for reversed row group order
+ let mut reversed_selectors = Vec::new();
+ for rg_idx in (0..rg_metadata.len()).rev() {
+ if let Some(selectors) = rg_selections.get(&rg_idx) {
+ reversed_selectors.extend(selectors.iter().cloned());
+ } else {
+ // No specific selection for this row group means select all
Review Comment:
this code doesn't appear to be covered by tests. I think we should add test
coverage
```shell
cargo llvm-cov --html --all-features -p datafusion-datasource-parquet
```
<img width="991" height="998" alt="Image"
src="https://github.com/user-attachments/assets/a126efd4-55b7-479f-9710-7ac9a1168cad"
/>
##########
datafusion/core/tests/physical_optimizer/pushdown_sort.rs:
##########
@@ -0,0 +1,778 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for sort pushdown optimizer rule (Phase 1)
+//!
+//! Phase 1 tests verify that:
+//! 1. Reverse scan is enabled (reverse_scan_inexact=true)
+//! 2. SortExec is kept (because ordering is inexact)
+//! 3. output_ordering remains unchanged
+//! 4. Early termination is enabled for TopK queries
+//! 5. Prefix matching works correctly
+
+use arrow::compute::SortOptions;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+
+use crate::physical_optimizer::test_utils::{
+ coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
+ parquet_exec_with_sort, repartition_exec, schema, sort_exec,
sort_exec_with_fetch,
+ sort_expr, sort_expr_options, OptimizationTest,
+};
+
+#[test]
+fn test_sort_pushdown_disabled() {
+ // When pushdown is disabled, plan should remain unchanged
+ let schema = schema();
+ let source = parquet_exec(schema.clone());
+ let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
+ let plan = sort_exec(sort_exprs, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), false),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_sort_pushdown_basic_phase1() {
+ // Phase 1: Reverse scan enabled, Sort kept, output_ordering unchanged
+ let schema = schema();
+
+ // Source has ASC NULLS LAST ordering (default)
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request DESC NULLS LAST ordering (exact reverse)
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(desc_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_sort_with_limit_phase1() {
+ // Phase 1: Sort with fetch enables early termination but keeps Sort
+ let schema = schema();
+
+ // Source has ASC ordering
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request DESC ordering with limit
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec_with_fetch(desc_ordering, Some(10), source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_sort_multiple_columns_phase1() {
+ // Phase 1: Sort on multiple columns - reverse multi-column ordering
+ let schema = schema();
+
+ // Source has [a DESC NULLS LAST, b ASC] ordering
+ let source_ordering = LexOrdering::new(vec![
+ sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ sort_expr("b", &schema),
+ ])
+ .unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a ASC NULLS FIRST, b DESC] ordering (exact reverse)
+ let reverse_ordering = LexOrdering::new(vec![
+ sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ ),
+ sort_expr_options(
+ "b",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ ])
+ .unwrap();
+ let plan = sort_exec(reverse_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+// ============================================================================
+// PREFIX MATCHING TESTS
+// ============================================================================
+
+#[test]
+fn test_prefix_match_single_column() {
+ // Test prefix matching: source has [a DESC, b ASC], query needs [a ASC]
+ // After reverse: [a ASC, b DESC] which satisfies [a ASC] prefix
+ let schema = schema();
+
+ // Source has [a DESC NULLS LAST, b ASC NULLS LAST] ordering
+ let source_ordering = LexOrdering::new(vec![
+ sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ sort_expr("b", &schema),
+ ])
+ .unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request only [a ASC NULLS FIRST] - a prefix of the reversed ordering
+ let prefix_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(prefix_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_prefix_match_with_limit() {
+ // Test prefix matching with LIMIT - important for TopK optimization
+ let schema = schema();
+
+ // Source has [a ASC, b DESC, c ASC] ordering
+ let source_ordering = LexOrdering::new(vec![
+ sort_expr("a", &schema),
+ sort_expr_options(
+ "b",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ sort_expr("c", &schema),
+ ])
+ .unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a DESC NULLS LAST, b ASC NULLS FIRST] with LIMIT 100
+ // This is a prefix (2 columns) of the reversed 3-column ordering
+ let prefix_ordering = LexOrdering::new(vec![
+ sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ sort_expr_options(
+ "b",
+ &schema,
+ SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ ),
+ ])
+ .unwrap();
+ let plan = sort_exec_with_fetch(prefix_ordering, Some(100), source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC, b@1 DESC NULLS LAST, c@2 ASC],
file_type=parquet
+ output:
+ Ok:
+ - SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_prefix_match_through_transparent_nodes() {
+ // Test prefix matching works through transparent nodes
+ let schema = schema();
+
+ // Source has [a DESC NULLS LAST, b ASC, c DESC] ordering
+ let source_ordering = LexOrdering::new(vec![
+ sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ sort_expr("b", &schema),
+ sort_expr_options(
+ "c",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ ])
+ .unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let coalesce = coalesce_batches_exec(source, 1024);
+ let repartition = repartition_exec(coalesce);
+
+ // Request only [a ASC NULLS FIRST] - prefix of reversed ordering
+ let prefix_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(prefix_ordering, repartition);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS
LAST], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_no_prefix_match_wrong_direction() {
+ // Test that prefix matching does NOT work if the direction is wrong
+ let schema = schema();
+
+ // Source has [a DESC, b ASC] ordering
+ let source_ordering = LexOrdering::new(vec![
+ sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ sort_expr("b", &schema),
+ ])
+ .unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a DESC] - same direction as source, NOT a reverse prefix
+ let same_direction = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(same_direction, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_no_prefix_match_longer_than_source() {
+ // Test that prefix matching does NOT work if requested is longer than
source
+ let schema = schema();
+
+ // Source has [a DESC] ordering (single column)
+ let source_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a ASC, b DESC] - longer than source, can't be a prefix
+ let longer_ordering = LexOrdering::new(vec![
+ sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ ),
+ sort_expr_options(
+ "b",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ ])
+ .unwrap();
+ let plan = sort_exec(longer_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet
+ "###
+ );
+}
+
+// ============================================================================
+// ORIGINAL TESTS
+// ============================================================================
+
+#[test]
+fn test_sort_through_coalesce_batches() {
+ // Sort pushes through CoalesceBatchesExec
+ let schema = schema();
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let coalesce = coalesce_batches_exec(source, 1024);
+
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(desc_ordering, coalesce);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_sort_through_repartition() {
+ // Sort should push through RepartitionExec
+ let schema = schema();
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let repartition = repartition_exec(source);
+
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(desc_ordering, repartition);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_nested_sorts() {
+ // Nested sort operations - only innermost can be optimized
+ let schema = schema();
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let inner_sort = sort_exec(desc_ordering, source);
+
+ let sort_exprs2 = LexOrdering::new(vec![sort_expr("b", &schema)]).unwrap();
+ let plan = sort_exec(sort_exprs2, inner_sort);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
+ - SortExec: expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_non_sort_plans_unchanged() {
+ // Plans without SortExec should pass through unchanged
+ let schema = schema();
+ let source = parquet_exec(schema.clone());
+ let plan = coalesce_batches_exec(source, 1024);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ output:
+ Ok:
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_optimizer_properties() {
+ // Test optimizer metadata
+ let optimizer = PushdownSort::new();
+
+ assert_eq!(optimizer.name(), "PushdownSort");
+ assert!(optimizer.schema_check());
+}
+
+#[test]
+fn test_sort_through_coalesce_partitions() {
+ // Sort should push through CoalescePartitionsExec
+ let schema = schema();
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let repartition = repartition_exec(source);
+ let coalesce_parts = coalesce_partitions_exec(repartition);
+
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(desc_ordering, coalesce_parts);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_scan_inexact=true
+ "
+ );
+}
+
+#[test]
+fn test_complex_plan_with_multiple_operators() {
+ // Test a complex plan with multiple operators between sort and source
+ let schema = schema();
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let coalesce_batches = coalesce_batches_exec(source, 1024);
+ let repartition = repartition_exec(coalesce_batches);
+ let coalesce_parts = coalesce_partitions_exec(repartition);
+
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
+ "a",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ )])
+ .unwrap();
+ let plan = sort_exec(desc_ordering, coalesce_parts);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
Review Comment:
I don't think a real plan would have maintains_sort_order here (as the
output is then fed into a SortExec)
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -638,6 +664,12 @@ impl FileSource for ParquetSource {
if let Some(predicate) = self.filter() {
writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
}
+
+ // Add reverse_scan info if enabled
+ if self.reverse_scan_inexact {
Review Comment:
I suggest not adding this detail to the tree explain format
##########
datafusion/core/tests/physical_optimizer/pushdown_sort.rs:
##########
@@ -0,0 +1,778 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for sort pushdown optimizer rule (Phase 1)
+//!
+//! Phase 1 tests verify that:
+//! 1. Reverse scan is enabled (reverse_scan_inexact=true)
+//! 2. SortExec is kept (because ordering is inexact)
+//! 3. output_ordering remains unchanged
+//! 4. Early termination is enabled for TopK queries
+//! 5. Prefix matching works correctly
+
+use arrow::compute::SortOptions;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+
+use crate::physical_optimizer::test_utils::{
Review Comment:
These tests cover the code well
Interestingly, it seems the only path that is not actually tested is when
the sort can be pushed down exactly
Running
```shell
cargo llvm-cov --html --all-features --test core_integration -- pushdown_sort
```
You can see the only red line is here
<img width="1047" height="540" alt="Image"
src="https://github.com/user-attachments/assets/0b698b87-71ba-4564-a1db-33654302d113"
/>
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -287,6 +289,11 @@ pub struct ParquetSource {
pub(crate) projection: ProjectionExprs,
#[cfg(feature = "parquet_encryption")]
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
+ /// If true, read files in reverse order and reverse row groups within
files.
+ /// But it's not guaranteed that rows within row groups are in reverse
order,
Review Comment:
I suggest calling this `reverse_row_group` or `reverse_row_group_scans` (and
updating the relevant method names too)
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -844,6 +845,114 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
+ let current_ordering = match self.output_ordering.first() {
+ Some(ordering) => ordering.as_ref(),
+ None => return Ok(SortOrderPushdownResult::Unsupported),
+ };
+
+ // Only support reverse ordering pushdown until now
+ if !is_reverse_ordering(order, current_ordering) {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+
+ // Ask the file source if it can handle the sort pushdown
+ let pushdown_result = self.file_source.try_pushdown_sort(order)?;
+
+ // Extract the new file source and determine result type
+ let (new_file_source, is_exact) = match pushdown_result {
+ SortOrderPushdownResult::Exact { inner } => (inner, true),
+ SortOrderPushdownResult::Inexact { inner } => (inner, false),
+ SortOrderPushdownResult::Unsupported => {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+ };
+
+ let mut new_config = self.clone();
+
+ // Reverse file groups: when scanning in reverse, we need to read files
+ // in reverse order to maintain the correct global ordering
+ new_config.file_groups = new_config
+ .file_groups
+ .into_iter()
+ .map(|group| {
+ let mut files = group.into_inner();
+ files.reverse();
+ files.into()
+ })
+ .collect();
+
+ // Phase 1: Make output_ordering unknown since we're only reversing
row groups,
+ // not guaranteeing perfect ordering. The rows within row groups are
not reversed.
+ // This is correct because:
+ // 1. We're only reversing row group read order
+ // 2. Rows within each row group maintain their original order
+ // 3. This provides approximate ordering, not guaranteed ordering
+ new_config.output_ordering = vec![];
+
+ new_config.file_source = new_file_source;
+
+ let new_config: Arc<dyn DataSource> = Arc::new(new_config);
+ if is_exact {
+ Ok(SortOrderPushdownResult::Exact { inner: new_config })
+ } else {
+ Ok(SortOrderPushdownResult::Inexact { inner: new_config })
+ }
+ }
+}
+
+/// Check if the requested ordering can be satisfied by reversing the current
ordering.
+///
+/// This function supports **prefix matching**: if the file has ordering [A
DESC, B ASC]
+/// and we need [A ASC], reversing the scan gives us [A ASC, B DESC], which
satisfies
+/// the requirement since [A ASC] is a prefix.
+///
+/// # Arguments
+/// * `requested` - The ordering required by the query
+/// * `current` - The natural ordering of the data source (e.g., from file
metadata)
+///
+/// # Returns
+/// `true` if reversing the current ordering would satisfy the requested
ordering
+///
+/// # Example
+/// ```text
+/// Current: [number DESC, letter ASC]
+/// Requested: [number ASC]
+/// Reversed: [number ASC, letter DESC] ✓ Prefix match!
+/// ```
+fn is_reverse_ordering(
+ requested: &[PhysicalSortExpr],
+ current: &[PhysicalSortExpr],
+) -> bool {
+ // Allow prefix matching - we can satisfy a prefix of the current ordering
+ // by reversing the scan
+ if requested.len() > current.len() {
+ return false;
+ }
+
+ requested.iter().zip(current.iter()).all(|(req, cur)| {
+ // Check if the expressions are semantically equivalent using
PhysicalExpr::eq
+ // This is more robust than string comparison as it handles:
+ // - Expression equivalence (not just string representation)
+ // - Complex expressions that might have different string forms but
same semantics
+ let exprs_match = req.expr.eq(&cur.expr);
+
+ // Now check if the sort options are exactly reversed
+ // For a valid reverse scan:
+ // - descending must be opposite: ASC ↔ DESC
+ // - nulls_first must be opposite: NULLS FIRST ↔ NULLS LAST
+ let options_reversed = req.options.descending != cur.options.descending
Review Comment:
🤔 it would be nice to have a `SortOptions::reversed()` method (no action
required)
##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization
+//!
+//! This optimizer attempts to push sort requirements down through the
execution plan
+//! tree to data sources that can natively handle them (e.g., by scanning
files in
+//! reverse order).
+//!
+//! ## How it works
+//!
+//! 1. Detects `SortExec` nodes in the plan
+//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort
requirement
+//! 3. Each node type defines its own pushdown behavior:
+//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.)
delegate to
+//! their children and wrap the result
+//! - **Data sources** (DataSourceExec) check if they can optimize for the
ordering
+//! - **Blocking nodes** return `Unsupported` to stop pushdown
+//! 4. Based on the result:
+//! - `Exact`: Remove the Sort operator (data source guarantees perfect
ordering)
+//! - `Inexact`: Keep Sort but use optimized input (enables early
termination for TopK)
+//! - `Unsupported`: No change
+//!
+//! ## Current capabilities (Phase 1)
+//!
+//! - Reverse scan optimization: when required sort is the reverse of the data
source's
+//! natural ordering, enable reverse scanning (reading row groups in reverse
order)
+//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query
needs
+//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement
+//!
+//! ## Future enhancements (Phase 2)
+//!
+//! - File reordering based on statistics
+//! - Return `Exact` when files are known to be perfectly sorted
+//! - Complete Sort elimination when ordering is guaranteed
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::Result;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::SortOrderPushdownResult;
+use std::sync::Arc;
+
+/// A PhysicalOptimizerRule that attempts to push down sort requirements to
data sources.
+///
+/// See module-level documentation for details.
+#[derive(Debug, Clone, Default)]
+pub struct PushdownSort;
+
+impl PushdownSort {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl PhysicalOptimizerRule for PushdownSort {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Check if sort pushdown optimization is enabled
+ if !config.execution.parquet.enable_sort_pushdown {
Review Comment:
This rule is not parquet specific (though I do realize parquet is the only
file source that uses it) so I think it makes sense to make this a normal
[optimizer](https://datafusion.apache.org/user-guide/configs.html) flag
perhaps something like
datafusion.optimizer.enable_window_limits |
-- | --
<br class="Apple-interchange-newline">
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -844,6 +845,114 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
+ let current_ordering = match self.output_ordering.first() {
+ Some(ordering) => ordering.as_ref(),
+ None => return Ok(SortOrderPushdownResult::Unsupported),
+ };
+
+ // Only support reverse ordering pushdown until now
+ if !is_reverse_ordering(order, current_ordering) {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+
+ // Ask the file source if it can handle the sort pushdown
+ let pushdown_result = self.file_source.try_pushdown_sort(order)?;
+
+ // Extract the new file source and determine result type
+ let (new_file_source, is_exact) = match pushdown_result {
+ SortOrderPushdownResult::Exact { inner } => (inner, true),
+ SortOrderPushdownResult::Inexact { inner } => (inner, false),
+ SortOrderPushdownResult::Unsupported => {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+ };
+
+ let mut new_config = self.clone();
+
+ // Reverse file groups: when scanning in reverse, we need to read files
+ // in reverse order to maintain the correct global ordering
+ new_config.file_groups = new_config
+ .file_groups
+ .into_iter()
+ .map(|group| {
+ let mut files = group.into_inner();
+ files.reverse();
+ files.into()
+ })
+ .collect();
+
+ // Phase 1: Make output_ordering unknown since we're only reversing
row groups,
+ // not guaranteeing perfect ordering. The rows within row groups are
not reversed.
+ // This is correct because:
+ // 1. We're only reversing row group read order
+ // 2. Rows within each row group maintain their original order
+ // 3. This provides approximate ordering, not guaranteed ordering
+ new_config.output_ordering = vec![];
+
+ new_config.file_source = new_file_source;
+
+ let new_config: Arc<dyn DataSource> = Arc::new(new_config);
+ if is_exact {
+ Ok(SortOrderPushdownResult::Exact { inner: new_config })
+ } else {
+ Ok(SortOrderPushdownResult::Inexact { inner: new_config })
+ }
+ }
+}
+
+/// Check if the requested ordering can be satisfied by reversing the current
ordering.
+///
+/// This function supports **prefix matching**: if the file has ordering [A
DESC, B ASC]
+/// and we need [A ASC], reversing the scan gives us [A ASC, B DESC], which
satisfies
+/// the requirement since [A ASC] is a prefix.
+///
+/// # Arguments
+/// * `requested` - The ordering required by the query
+/// * `current` - The natural ordering of the data source (e.g., from file
metadata)
+///
+/// # Returns
+/// `true` if reversing the current ordering would satisfy the requested
ordering
+///
+/// # Example
+/// ```text
+/// Current: [number DESC, letter ASC]
+/// Requested: [number ASC]
+/// Reversed: [number ASC, letter DESC] ✓ Prefix match!
+/// ```
+fn is_reverse_ordering(
+ requested: &[PhysicalSortExpr],
+ current: &[PhysicalSortExpr],
+) -> bool {
+ // Allow prefix matching - we can satisfy a prefix of the current ordering
+ // by reversing the scan
+ if requested.len() > current.len() {
+ return false;
+ }
+
+ requested.iter().zip(current.iter()).all(|(req, cur)| {
+ // Check if the expressions are semantically equivalent using
PhysicalExpr::eq
+ // This is more robust than string comparison as it handles:
+ // - Expression equivalence (not just string representation)
+ // - Complex expressions that might have different string forms but
same semantics
+ let exprs_match = req.expr.eq(&cur.expr);
+
+ // Now check if the sort options are exactly reversed
+ // For a valid reverse scan:
+ // - descending must be opposite: ASC ↔ DESC
+ // - nulls_first must be opposite: NULLS FIRST ↔ NULLS LAST
+ let options_reversed = req.options.descending != cur.options.descending
+ && req.options.nulls_first != cur.options.nulls_first;
+
+ // Both conditions must be true:
+ // 1. Expressions are semantically equivalent
+ // 2. Completely reversed sort options
Review Comment:
I think this comment just repeats the code, so doesn't add a lot of extra
value
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -844,6 +845,114 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
+ let current_ordering = match self.output_ordering.first() {
+ Some(ordering) => ordering.as_ref(),
+ None => return Ok(SortOrderPushdownResult::Unsupported),
+ };
+
+ // Only support reverse ordering pushdown until now
+ if !is_reverse_ordering(order, current_ordering) {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+
+ // Ask the file source if it can handle the sort pushdown
+ let pushdown_result = self.file_source.try_pushdown_sort(order)?;
+
+ // Extract the new file source and determine result type
+ let (new_file_source, is_exact) = match pushdown_result {
+ SortOrderPushdownResult::Exact { inner } => (inner, true),
+ SortOrderPushdownResult::Inexact { inner } => (inner, false),
+ SortOrderPushdownResult::Unsupported => {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+ };
+
+ let mut new_config = self.clone();
+
+ // Reverse file groups: when scanning in reverse, we need to read files
+ // in reverse order to maintain the correct global ordering
+ new_config.file_groups = new_config
+ .file_groups
+ .into_iter()
+ .map(|group| {
+ let mut files = group.into_inner();
+ files.reverse();
+ files.into()
+ })
+ .collect();
+
+ // Phase 1: Make output_ordering unknown since we're only reversing
row groups,
+ // not guaranteeing perfect ordering. The rows within row groups are
not reversed.
+ // This is correct because:
+ // 1. We're only reversing row group read order
+ // 2. Rows within each row group maintain their original order
+ // 3. This provides approximate ordering, not guaranteed ordering
+ new_config.output_ordering = vec![];
+
+ new_config.file_source = new_file_source;
+
+ let new_config: Arc<dyn DataSource> = Arc::new(new_config);
+ if is_exact {
+ Ok(SortOrderPushdownResult::Exact { inner: new_config })
+ } else {
+ Ok(SortOrderPushdownResult::Inexact { inner: new_config })
+ }
+ }
+}
+
+/// Check if the requested ordering can be satisfied by reversing the current
ordering.
+///
+/// This function supports **prefix matching**: if the file has ordering [A
DESC, B ASC]
+/// and we need [A ASC], reversing the scan gives us [A ASC, B DESC], which
satisfies
+/// the requirement since [A ASC] is a prefix.
+///
+/// # Arguments
+/// * `requested` - The ordering required by the query
+/// * `current` - The natural ordering of the data source (e.g., from file
metadata)
+///
+/// # Returns
+/// `true` if reversing the current ordering would satisfy the requested
ordering
+///
+/// # Example
+/// ```text
+/// Current: [number DESC, letter ASC]
+/// Requested: [number ASC]
+/// Reversed: [number ASC, letter DESC] ✓ Prefix match!
+/// ```
+fn is_reverse_ordering(
+ requested: &[PhysicalSortExpr],
+ current: &[PhysicalSortExpr],
+) -> bool {
+ // Allow prefix matching - we can satisfy a prefix of the current ordering
+ // by reversing the scan
+ if requested.len() > current.len() {
+ return false;
+ }
+
+ requested.iter().zip(current.iter()).all(|(req, cur)| {
+ // Check if the expressions are semantically equivalent using
PhysicalExpr::eq
+ // This is more robust than string comparison as it handles:
Review Comment:
I don't think we need to justify why we are comparing two expressions using
`eq` so we can probably avoid these comments
##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization
+//!
+//! This optimizer attempts to push sort requirements down through the
execution plan
+//! tree to data sources that can natively handle them (e.g., by scanning
files in
+//! reverse order).
+//!
+//! ## How it works
+//!
+//! 1. Detects `SortExec` nodes in the plan
+//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort
requirement
+//! 3. Each node type defines its own pushdown behavior:
+//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.)
delegate to
+//! their children and wrap the result
+//! - **Data sources** (DataSourceExec) check if they can optimize for the
ordering
+//! - **Blocking nodes** return `Unsupported` to stop pushdown
+//! 4. Based on the result:
+//! - `Exact`: Remove the Sort operator (data source guarantees perfect
ordering)
+//! - `Inexact`: Keep Sort but use optimized input (enables early
termination for TopK)
+//! - `Unsupported`: No change
+//!
+//! ## Current capabilities (Phase 1)
Review Comment:
If we are going to leave references to phase 2 in the code, I think we
should file a ticket and leave a link to that new ticket for future readers. If
we leave it as a comment it will be harder to find and track the current status
of I fear
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -844,6 +845,114 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
+ let current_ordering = match self.output_ordering.first() {
+ Some(ordering) => ordering.as_ref(),
+ None => return Ok(SortOrderPushdownResult::Unsupported),
+ };
+
+ // Only support reverse ordering pushdown until now
+ if !is_reverse_ordering(order, current_ordering) {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+
+ // Ask the file source if it can handle the sort pushdown
+ let pushdown_result = self.file_source.try_pushdown_sort(order)?;
+
+ // Extract the new file source and determine result type
+ let (new_file_source, is_exact) = match pushdown_result {
+ SortOrderPushdownResult::Exact { inner } => (inner, true),
+ SortOrderPushdownResult::Inexact { inner } => (inner, false),
+ SortOrderPushdownResult::Unsupported => {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+ };
+
+ let mut new_config = self.clone();
+
+ // Reverse file groups: when scanning in reverse, we need to read files
+ // in reverse order to maintain the correct global ordering
+ new_config.file_groups = new_config
+ .file_groups
+ .into_iter()
+ .map(|group| {
+ let mut files = group.into_inner();
+ files.reverse();
+ files.into()
+ })
+ .collect();
+
+ // Phase 1: Make output_ordering unknown since we're only reversing
row groups,
+ // not guaranteeing perfect ordering. The rows within row groups are
not reversed.
+ // This is correct because:
+ // 1. We're only reversing row group read order
+ // 2. Rows within each row group maintain their original order
+ // 3. This provides approximate ordering, not guaranteed ordering
+ new_config.output_ordering = vec![];
+
+ new_config.file_source = new_file_source;
+
+ let new_config: Arc<dyn DataSource> = Arc::new(new_config);
+ if is_exact {
+ Ok(SortOrderPushdownResult::Exact { inner: new_config })
+ } else {
+ Ok(SortOrderPushdownResult::Inexact { inner: new_config })
+ }
+ }
+}
+
+/// Check if the requested ordering can be satisfied by reversing the current
ordering.
+///
+/// This function supports **prefix matching**: if the file has ordering [A
DESC, B ASC]
+/// and we need [A ASC], reversing the scan gives us [A ASC, B DESC], which
satisfies
+/// the requirement since [A ASC] is a prefix.
+///
+/// # Arguments
+/// * `requested` - The ordering required by the query
+/// * `current` - The natural ordering of the data source (e.g., from file
metadata)
+///
+/// # Returns
+/// `true` if reversing the current ordering would satisfy the requested
ordering
+///
+/// # Example
+/// ```text
+/// Current: [number DESC, letter ASC]
+/// Requested: [number ASC]
+/// Reversed: [number ASC, letter DESC] ✓ Prefix match!
+/// ```
+fn is_reverse_ordering(
Review Comment:
Minor: I suggeset making this a method on `LexOrdering` rather than a free
function -- perhaps like `LexOrdering::is_reverse(&self, other: &LexOrdering)`
-- that way it will be easier to find if other people want to use it
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -710,6 +742,34 @@ impl FileSource for ParquetSource {
)
.with_updated_node(source))
}
+
+ /// When push down to parquet source of a sort operation is possible,
+ /// create a new ParquetSource with reverse_scan enabled.
+ ///
+ /// # Phase 1 Behavior (Current)
+ /// Returns `Inexact` because we're only reversing the scan direction and
reordering
+ /// files/row groups. We still need to verify ordering at a higher level.
+ ///
+ /// # Phase 2 (Future)
+ /// Could return `Exact` when we can guarantee that the scan order matches
the requested order, and
+ /// we can remove any higher-level sort operations.
+ ///
+ /// TODO support more policies in addition to reversing the scan.
+ fn try_pushdown_sort(
Review Comment:
This API does seem right to me -- specifically, since the ParquetSource
doesn't actually know how it is sorted (the sort ordering is on the
FileScanConfig,
[here](https://github.com/apache/datafusion/blob/eb39e540f8373ea921fb4f9f1cd57eaa59bedd00/datafusion/datasource/src/file_scan_config.rs#L159-L160))
it will never be able to 'push down the sort'
This may be the same comment as @adriangb is making below
What I would suggest instead is a more general API tied to what the
ParquetSource **can** do -- namely reverse or partially reverse its output.
Maybe a method like
```rust
// tries to reverse the output order of rows
fn try_reverse_output(&self) ->
SortOrderPushdownResult<SortOrderPushdownResult<Arc<dyn FileSource>>> {
...
}
```
This could then be called from the SortPushdown optimizer rule when it is
known the file is sorted in the opposite way as the desired order in the query
🤔
I actually do think there are usecases for `try_pushdown_sort` into other
data sources (e.g. when reading from some other database that could actually
move the sort there) however, that is already handled at the ExecutionPlan level
##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -682,6 +685,29 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
+
+ /// Try to push down sort ordering requirements to this node.
Review Comment:
👍
##########
datafusion/physical-optimizer/src/pushdown_sort.rs:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization
+//!
+//! This optimizer attempts to push sort requirements down through the
execution plan
+//! tree to data sources that can natively handle them (e.g., by scanning
files in
+//! reverse order).
+//!
+//! ## How it works
+//!
+//! 1. Detects `SortExec` nodes in the plan
+//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort
requirement
+//! 3. Each node type defines its own pushdown behavior:
+//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.)
delegate to
+//! their children and wrap the result
+//! - **Data sources** (DataSourceExec) check if they can optimize for the
ordering
+//! - **Blocking nodes** return `Unsupported` to stop pushdown
+//! 4. Based on the result:
+//! - `Exact`: Remove the Sort operator (data source guarantees perfect
ordering)
+//! - `Inexact`: Keep Sort but use optimized input (enables early
termination for TopK)
+//! - `Unsupported`: No change
+//!
+//! ## Current capabilities (Phase 1)
+//!
+//! - Reverse scan optimization: when required sort is the reverse of the data
source's
+//! natural ordering, enable reverse scanning (reading row groups in reverse
order)
+//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query
needs
+//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement
+//!
+//! ## Future enhancements (Phase 2)
+//!
+//! - File reordering based on statistics
+//! - Return `Exact` when files are known to be perfectly sorted
+//! - Complete Sort elimination when ordering is guaranteed
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::Result;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::SortOrderPushdownResult;
+use std::sync::Arc;
+
+/// A PhysicalOptimizerRule that attempts to push down sort requirements to
data sources.
+///
+/// See module-level documentation for details.
+#[derive(Debug, Clone, Default)]
+pub struct PushdownSort;
+
+impl PushdownSort {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl PhysicalOptimizerRule for PushdownSort {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Check if sort pushdown optimization is enabled
+ if !config.execution.parquet.enable_sort_pushdown {
+ return Ok(plan);
+ }
+
+ // Use transform_down to find and optimize all SortExec nodes
(including nested ones)
+ plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
+ // Check if this is a SortExec
+ let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>()
else {
+ return Ok(Transformed::no(plan));
+ };
+
+ let sort_input = Arc::clone(sort_exec.input());
+ let required_ordering = sort_exec.expr();
+
+ // Try to push the sort requirement down through the plan tree
+ // Each node type defines its own pushdown behavior via
try_pushdown_sort()
+ match sort_input.try_pushdown_sort(required_ordering)? {
+ SortOrderPushdownResult::Exact { inner } => {
+ // Data source guarantees perfect ordering - remove the
Sort operator
Review Comment:
This is very cool
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1344,4 +1397,268 @@ mod test {
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
+
+ #[tokio::test]
+ async fn test_reverse_scan_row_groups() {
+ use parquet::file::properties::WriterProperties;
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create multiple batches to ensure multiple row groups
+ let batch1 =
+ record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(3)])).unwrap();
+ let batch2 =
+ record_batch!(("a", Int32, vec![Some(4), Some(5),
Some(6)])).unwrap();
+ let batch3 =
+ record_batch!(("a", Int32, vec![Some(7), Some(8),
Some(9)])).unwrap();
+
+ // Write parquet file with multiple row groups
+ // Force small row groups by setting max_row_group_size
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(3) // Force each batch into its own row
group
+ .build();
+
+ let mut out = BytesMut::new().writer();
+ {
+ let mut writer =
+ ArrowWriter::try_new(&mut out, batch1.schema(),
Some(props)).unwrap();
+ writer.write(&batch1).unwrap();
+ writer.write(&batch2).unwrap();
+ writer.write(&batch3).unwrap();
+ writer.finish().unwrap();
+ }
+ let data = out.into_inner().freeze();
+ let data_len = data.len();
+ store
+ .put(&Path::from("test.parquet"), data.into())
+ .await
+ .unwrap();
+
+ let schema = batch1.schema();
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ );
+
+ let make_opener = |reverse_scan: bool| ParquetOpener {
+ partition_index: 0,
+ projection: ProjectionExprs::from_indices(&[0], &schema),
+ batch_size: 1024,
+ limit: None,
+ predicate: None,
+ table_schema: TableSchema::from_file_schema(Arc::clone(&schema)),
+ metadata_size_hint: None,
+ metrics: ExecutionPlanMetricsSet::new(),
+ parquet_file_reader_factory:
Arc::new(DefaultParquetFileReaderFactory::new(
+ Arc::clone(&store),
+ )),
+ pushdown_filters: false,
+ reorder_filters: false,
+ force_filter_selections: false,
+ enable_page_index: false,
+ enable_bloom_filter: false,
+ enable_row_group_stats_pruning: false,
+ coerce_int96: None,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: None,
+ expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
+ max_predicate_cache_size: None,
+ reverse_scan_inexact: reverse_scan,
+ };
+
+ // Test normal scan (forward)
+ let opener = make_opener(false);
+ let stream = opener.open(file.clone()).unwrap().await.unwrap();
+ let forward_values = collect_int32_values(stream).await;
+
+ // Test reverse scan
+ let opener = make_opener(true);
+ let stream = opener.open(file.clone()).unwrap().await.unwrap();
+ let reverse_values = collect_int32_values(stream).await;
+
+ // The forward scan should return data in the order written
+ assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
+
+ // With reverse scan, row groups are reversed, so we expect:
+ // Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1
(1,2,3)
+ assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
+ }
+
+ #[tokio::test]
+ async fn test_reverse_scan_single_row_group() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create a single batch (single row group)
+ let batch = record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(3)])).unwrap();
+ let data_size =
+ write_parquet(Arc::clone(&store), "test.parquet",
batch.clone()).await;
+
+ let schema = batch.schema();
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_size).unwrap(),
+ );
+
+ let make_opener = |reverse_scan: bool| ParquetOpener {
+ partition_index: 0,
+ projection: ProjectionExprs::from_indices(&[0], &schema),
+ batch_size: 1024,
+ limit: None,
+ predicate: None,
+ table_schema: TableSchema::from_file_schema(Arc::clone(&schema)),
+ metadata_size_hint: None,
+ metrics: ExecutionPlanMetricsSet::new(),
+ parquet_file_reader_factory:
Arc::new(DefaultParquetFileReaderFactory::new(
+ Arc::clone(&store),
+ )),
+ pushdown_filters: false,
+ reorder_filters: false,
+ force_filter_selections: false,
+ enable_page_index: false,
+ enable_bloom_filter: false,
+ enable_row_group_stats_pruning: false,
+ coerce_int96: None,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: None,
+ expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
+ max_predicate_cache_size: None,
+ reverse_scan_inexact: reverse_scan,
+ };
+
+ // With a single row group, forward and reverse should be the same
+ // (only the row group order is reversed, not the rows within)
+ let opener_forward = make_opener(false);
+ let stream_forward =
opener_forward.open(file.clone()).unwrap().await.unwrap();
+ let (batches_forward, _) =
count_batches_and_rows(stream_forward).await;
+
+ let opener_reverse = make_opener(true);
+ let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap();
+ let (batches_reverse, _) =
count_batches_and_rows(stream_reverse).await;
+
+ // Both should have the same number of batches since there's only one
row group
+ assert_eq!(batches_forward, batches_reverse);
+ }
+
+ #[tokio::test]
+ async fn test_reverse_scan_with_row_selection() {
+ use parquet::file::properties::WriterProperties;
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create 3 batches with DIFFERENT selection patterns
+ let batch1 =
+ record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3),
Some(4)]))
+ .unwrap(); // 4 rows
+ let batch2 =
+ record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7),
Some(8)]))
+ .unwrap(); // 4 rows
+ let batch3 =
+ record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11),
Some(12)]))
+ .unwrap(); // 4 rows
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(4)
+ .build();
+
+ let mut out = BytesMut::new().writer();
+ {
+ let mut writer =
+ ArrowWriter::try_new(&mut out, batch1.schema(),
Some(props)).unwrap();
+ writer.write(&batch1).unwrap();
+ writer.write(&batch2).unwrap();
+ writer.write(&batch3).unwrap();
+ writer.finish().unwrap();
+ }
+ let data = out.into_inner().freeze();
+ let data_len = data.len();
+ store
+ .put(&Path::from("test.parquet"), data.into())
+ .await
+ .unwrap();
+
+ let schema = batch1.schema();
+
+ use crate::ParquetAccessPlan;
+ use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+ // Row group 0: skip first 2, select last 2 (should get: 3, 4)
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::skip(2),
RowSelector::select(2)]),
+ );
+ // Row group 1: select all (should get: 5, 6, 7, 8)
+ // Row group 2: select first 2, skip last 2 (should get: 9, 10)
+ access_plan.scan_selection(
+ 2,
+ RowSelection::from(vec![RowSelector::select(2),
RowSelector::skip(2)]),
+ );
+
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ )
+ .with_extensions(Arc::new(access_plan));
+
+ let make_opener = |reverse_scan: bool| ParquetOpener {
Review Comment:
I know these tests are just following the pattern in this file, but maybe as
a follow on PR we can refactor this to reduce replication (and make it clear
what parts are importat)
For example we could make a `ParquetOpenerBuilder` or something 🤔
##########
datafusion/core/tests/physical_optimizer/pushdown_sort.rs:
##########
@@ -0,0 +1,778 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for sort pushdown optimizer rule (Phase 1)
+//!
+//! Phase 1 tests verify that:
+//! 1. Reverse scan is enabled (reverse_scan_inexact=true)
+//! 2. SortExec is kept (because ordering is inexact)
+//! 3. output_ordering remains unchanged
+//! 4. Early termination is enabled for TopK queries
+//! 5. Prefix matching works correctly
+
+use arrow::compute::SortOptions;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+
+use crate::physical_optimizer::test_utils::{
+ coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
+ parquet_exec_with_sort, repartition_exec, schema, sort_exec,
sort_exec_with_fetch,
+ sort_expr, sort_expr_options, OptimizationTest,
+};
+
+#[test]
+fn test_sort_pushdown_disabled() {
+ // When pushdown is disabled, plan should remain unchanged
+ let schema = schema();
+ let source = parquet_exec(schema.clone());
+ let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
+ let plan = sort_exec(sort_exprs, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), false),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_sort_pushdown_basic_phase1() {
+ // Phase 1: Reverse scan enabled, Sort kept, output_ordering unchanged
+ let schema = schema();
+
+ // Source has ASC NULLS LAST ordering (default)
+ let source_ordering = LexOrdering::new(vec![sort_expr("a",
&schema)]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request DESC NULLS LAST ordering (exact reverse)
+ let desc_ordering = LexOrdering::new(vec![sort_expr_options(
Review Comment:
II think you could reduce a lot of boiler plate if you used the physical
sort expr methods like
```rust
let a = sort_expr("a", &schema);
let desc_ordering = LexOrdering::new(vec[a.desc()]);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]