This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3a41cc6078 Establish the high level API for sort pushdown and the
optimizer rule and support reverse files and row groups (#19064)
3a41cc6078 is described below
commit 3a41cc6078ba71821d3ccceb944e9a5eee16774e
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Dec 17 10:06:21 2025 +0800
Establish the high level API for sort pushdown and the optimizer rule and
support reverse files and row groups (#19064)
## Which issue does this PR close?
Establish the high level API for sort pushdown and the optimizer rule.
Only re-arrange files and row groups and return Inexact, now support
reverse order case, and we don't need to cache anything for this
implementation, so it's no memory overhead.
It will have huge performance improvement with dynamic topk pushdown to
skip row groups.
Details:
Performance results on ClickBench sorted data: 13ms vs 300ms baseline
(23x faster), close to aggressive caching approach (9.8ms) but with much
better memory stability
[details](https://github.com/apache/datafusion/pull/18817#issuecomment-3605978882)
- Closes https://github.com/apache/datafusion/issues/19059
- Closes https://github.com/apache/datafusion/issues/10433
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
datafusion/common/src/config.rs | 15 +
datafusion/core/tests/physical_optimizer/mod.rs | 1 +
.../core/tests/physical_optimizer/pushdown_sort.rs | 672 +++++++++++++++++++++
.../core/tests/physical_optimizer/test_utils.rs | 78 ++-
datafusion/datasource-parquet/src/mod.rs | 1 +
datafusion/datasource-parquet/src/opener.rs | 375 +++++++++++-
datafusion/datasource-parquet/src/sort.rs | 407 +++++++++++++
datafusion/datasource-parquet/src/source.rs | 182 ++++++
datafusion/datasource/src/file.rs | 35 ++
datafusion/datasource/src/file_scan_config.rs | 82 ++-
datafusion/datasource/src/source.rs | 35 +-
datafusion/execution/src/config.rs | 7 +
datafusion/physical-expr-common/src/sort_expr.rs | 103 ++++
datafusion/physical-optimizer/src/lib.rs | 1 +
datafusion/physical-optimizer/src/optimizer.rs | 3 +
datafusion/physical-optimizer/src/pushdown_sort.rs | 129 ++++
datafusion/physical-plan/src/coalesce_batches.rs | 16 +
.../physical-plan/src/coalesce_partitions.rs | 38 ++
datafusion/physical-plan/src/execution_plan.rs | 28 +-
datafusion/physical-plan/src/lib.rs | 2 +
datafusion/physical-plan/src/repartition/mod.rs | 23 +
datafusion/physical-plan/src/sort_pushdown.rs | 120 ++++
datafusion/proto-common/src/generated/pbjson.rs | 9 +-
.../test_files/create_external_table.slt | 2 +-
.../test_files/dynamic_filter_pushdown_config.slt | 326 ++++++++++
datafusion/sqllogictest/test_files/explain.slt | 4 +
.../sqllogictest/test_files/information_schema.slt | 2 +
datafusion/sqllogictest/test_files/topk.slt | 2 +-
docs/source/user-guide/configs.md | 1 +
29 files changed, 2670 insertions(+), 29 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 6539911cb9..c8ed491ef4 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1079,6 +1079,21 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to
`LargeBinary`.
pub expand_views_at_output: bool, default = false
+
+ /// Enable sort pushdown optimization.
+ /// When enabled, attempts to push sort requirements down to data
sources
+ /// that can natively handle them (e.g., by reversing file/row group
read order).
+ ///
+ /// Returns **inexact ordering**: Sort operator is kept for
correctness,
+ /// but optimized input enables early termination for TopK queries
(ORDER BY ... LIMIT N),
+ /// providing significant speedup.
+ ///
+ /// Memory: No additional overhead (only changes read order).
+ ///
+ /// Future: Will add option to detect perfectly sorted data and
eliminate Sort completely.
+ ///
+ /// Default: true
+ pub enable_sort_pushdown: bool, default = true
}
}
diff --git a/datafusion/core/tests/physical_optimizer/mod.rs
b/datafusion/core/tests/physical_optimizer/mod.rs
index fe9db1975d..d11322cd26 100644
--- a/datafusion/core/tests/physical_optimizer/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/mod.rs
@@ -32,6 +32,7 @@ mod limit_pushdown;
mod limited_distinct_aggregation;
mod partition_statistics;
mod projection_pushdown;
+mod pushdown_sort;
mod replace_with_order_preserving_variants;
mod sanity_checker;
#[expect(clippy::needless_pass_by_value)]
diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs
b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs
new file mode 100644
index 0000000000..f26ed2905b
--- /dev/null
+++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs
@@ -0,0 +1,672 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for sort pushdown optimizer rule (Phase 1)
+//!
+//! Phase 1 tests verify that:
+//! 1. Reverse scan is enabled (reverse_row_groups=true)
+//! 2. SortExec is kept (because ordering is inexact)
+//! 3. output_ordering remains unchanged
+//! 4. Early termination is enabled for TopK queries
+//! 5. Prefix matching works correctly
+
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+
+use crate::physical_optimizer::test_utils::{
+ coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
+ parquet_exec_with_sort, repartition_exec, schema, sort_exec,
sort_exec_with_fetch,
+ sort_expr, OptimizationTest,
+};
+
+#[test]
+fn test_sort_pushdown_disabled() {
+ // When pushdown is disabled, plan should remain unchanged
+ let schema = schema();
+ let source = parquet_exec(schema.clone());
+ let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
+ let plan = sort_exec(sort_exprs, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), false),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_sort_pushdown_basic_phase1() {
+ // Phase 1: Reverse scan enabled, Sort kept, output_ordering unchanged
+ let schema = schema();
+
+ // Source has ASC NULLS LAST ordering (default)
+ let a = sort_expr("a", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request DESC NULLS LAST ordering (exact reverse)
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let plan = sort_exec(desc_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_sort_with_limit_phase1() {
+ // Phase 1: Sort with fetch enables early termination but keeps Sort
+ let schema = schema();
+
+ // Source has ASC ordering
+ let a = sort_expr("a", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request DESC ordering with limit
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let plan = sort_exec_with_fetch(desc_ordering, Some(10), source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_sort_multiple_columns_phase1() {
+ // Phase 1: Sort on multiple columns - reverse multi-column ordering
+ let schema = schema();
+
+ // Source has [a DESC NULLS LAST, b ASC] ordering
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone().reverse(),
b.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a ASC NULLS FIRST, b DESC] ordering (exact reverse)
+ let reverse_ordering =
+ LexOrdering::new(vec![a.clone().asc().nulls_first(),
b.reverse()]).unwrap();
+ let plan = sort_exec(reverse_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+// ============================================================================
+// PREFIX MATCHING TESTS
+// ============================================================================
+
+#[test]
+fn test_prefix_match_single_column() {
+ // Test prefix matching: source has [a DESC, b ASC], query needs [a ASC]
+ // After reverse: [a ASC, b DESC] which satisfies [a ASC] prefix
+ let schema = schema();
+
+ // Source has [a DESC NULLS LAST, b ASC NULLS LAST] ordering
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone().reverse(),
b]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request only [a ASC NULLS FIRST] - a prefix of the reversed ordering
+ let prefix_ordering =
LexOrdering::new(vec![a.clone().asc().nulls_first()]).unwrap();
+ let plan = sort_exec(prefix_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_prefix_match_with_limit() {
+ // Test prefix matching with LIMIT - important for TopK optimization
+ let schema = schema();
+
+ // Source has [a ASC, b DESC, c ASC] ordering
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let c = sort_expr("c", &schema);
+ let source_ordering =
+ LexOrdering::new(vec![a.clone(), b.clone().reverse(), c]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a DESC NULLS LAST, b ASC NULLS FIRST] with LIMIT 100
+ // This is a prefix (2 columns) of the reversed 3-column ordering
+ let prefix_ordering =
+ LexOrdering::new(vec![a.reverse(),
b.clone().asc().nulls_first()]).unwrap();
+ let plan = sort_exec_with_fetch(prefix_ordering, Some(100), source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC, b@1 DESC NULLS LAST, c@2 ASC],
file_type=parquet
+ output:
+ Ok:
+ - SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_prefix_match_through_transparent_nodes() {
+ // Test prefix matching works through transparent nodes
+ let schema = schema();
+
+ // Source has [a DESC NULLS LAST, b ASC, c DESC] ordering
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let c = sort_expr("c", &schema);
+ let source_ordering =
+ LexOrdering::new(vec![a.clone().reverse(), b, c.reverse()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let coalesce = coalesce_batches_exec(source, 1024);
+ let repartition = repartition_exec(coalesce);
+
+ // Request only [a ASC NULLS FIRST] - prefix of reversed ordering
+ let prefix_ordering =
LexOrdering::new(vec![a.clone().asc().nulls_first()]).unwrap();
+ let plan = sort_exec(prefix_ordering, repartition);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS
LAST], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_no_prefix_match_wrong_direction() {
+ // Test that prefix matching does NOT work if the direction is wrong
+ let schema = schema();
+
+ // Source has [a DESC, b ASC] ordering
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone().reverse(),
b]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a DESC] - same direction as source, NOT a reverse prefix
+ let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap();
+ let plan = sort_exec(same_direction, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_no_prefix_match_longer_than_source() {
+ // Test that prefix matching does NOT work if requested is longer than
source
+ let schema = schema();
+
+ // Source has [a DESC] ordering (single column)
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone().reverse()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request [a ASC, b DESC] - longer than source, can't be a prefix
+ let longer_ordering =
+ LexOrdering::new(vec![a.clone().asc().nulls_first(),
b.reverse()]).unwrap();
+ let plan = sort_exec(longer_ordering, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet
+ "###
+ );
+}
+
+// ============================================================================
+// ORIGINAL TESTS
+// ============================================================================
+
+#[test]
+fn test_sort_through_coalesce_batches() {
+ // Sort pushes through CoalesceBatchesExec
+ let schema = schema();
+ let a = sort_expr("a", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let coalesce = coalesce_batches_exec(source, 1024);
+
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let plan = sort_exec(desc_ordering, coalesce);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_sort_through_repartition() {
+ // Sort should push through RepartitionExec
+ let schema = schema();
+ let a = sort_expr("a", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let repartition = repartition_exec(source);
+
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let plan = sort_exec(desc_ordering, repartition);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_nested_sorts() {
+ // Nested sort operations - only innermost can be optimized
+ let schema = schema();
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let inner_sort = sort_exec(desc_ordering, source);
+
+ let sort_exprs2 = LexOrdering::new(vec![b]).unwrap();
+ let plan = sort_exec(sort_exprs2, inner_sort);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
+ - SortExec: expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_non_sort_plans_unchanged() {
+ // Plans without SortExec should pass through unchanged
+ let schema = schema();
+ let source = parquet_exec(schema.clone());
+ let plan = coalesce_batches_exec(source, 1024);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ output:
+ Ok:
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_optimizer_properties() {
+ // Test optimizer metadata
+ let optimizer = PushdownSort::new();
+
+ assert_eq!(optimizer.name(), "PushdownSort");
+ assert!(optimizer.schema_check());
+}
+
+#[test]
+fn test_sort_through_coalesce_partitions() {
+ // Sort should push through CoalescePartitionsExec
+ let schema = schema();
+ let a = sort_expr("a", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let repartition = repartition_exec(source);
+ let coalesce_parts = coalesce_partitions_exec(repartition);
+
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let plan = sort_exec(desc_ordering, coalesce_parts);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_complex_plan_with_multiple_operators() {
+ // Test a complex plan with multiple operators between sort and source
+ let schema = schema();
+ let a = sort_expr("a", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+ let coalesce_batches = coalesce_batches_exec(source, 1024);
+ let repartition = repartition_exec(coalesce_batches);
+ let coalesce_parts = coalesce_partitions_exec(repartition);
+
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let plan = sort_exec(desc_ordering, coalesce_parts);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1, maintains_sort_order=true
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_multiple_sorts_different_columns() {
+ // Test nested sorts on different columns - only innermost can optimize
+ let schema = schema();
+ let a = sort_expr("a", &schema);
+ let c = sort_expr("c", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // First sort by column 'a' DESC (reverse of source)
+ let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
+ let sort1 = sort_exec(desc_ordering, source);
+
+ // Then sort by column 'c' (different column, can't optimize)
+ let sort_exprs2 = LexOrdering::new(vec![c]).unwrap();
+ let plan = sort_exec(sort_exprs2, sort1);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+ - SortExec: expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
+
+#[test]
+fn test_no_pushdown_for_unordered_source() {
+ // Verify pushdown does NOT happen for sources without ordering
+ let schema = schema();
+ let source = parquet_exec(schema.clone()); // No output_ordering
+ let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
+ let plan = sort_exec(sort_exprs, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_no_pushdown_for_non_reverse_sort() {
+ // Verify pushdown does NOT happen when sort doesn't reverse source
ordering
+ let schema = schema();
+
+ // Source sorted by 'a' ASC
+ let a = sort_expr("a", &schema);
+ let b = sort_expr("b", &schema);
+ let source_ordering = LexOrdering::new(vec![a]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Request sort by 'b' (different column)
+ let sort_exprs = LexOrdering::new(vec![b]).unwrap();
+ let plan = sort_exec(sort_exprs, source);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r###"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ "###
+ );
+}
+
+#[test]
+fn test_pushdown_through_blocking_node() {
+ // Test that pushdown works for inner sort even when outer sort is blocked
+ // Structure: Sort -> Aggregate (blocks pushdown) -> Sort -> Scan
+ // The outer sort can't push through aggregate, but the inner sort should
still optimize
+ use datafusion_functions_aggregate::count::count_udaf;
+ use datafusion_physical_expr::aggregate::AggregateExprBuilder;
+ use datafusion_physical_plan::aggregates::{
+ AggregateExec, AggregateMode, PhysicalGroupBy,
+ };
+ use std::sync::Arc;
+
+ let schema = schema();
+
+ // Bottom: DataSource with [a ASC NULLS LAST] ordering
+ let a = sort_expr("a", &schema);
+ let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
+
+ // Inner Sort: [a DESC NULLS FIRST] - exact reverse, CAN push down to
source
+ let inner_sort_ordering =
LexOrdering::new(vec![a.clone().reverse()]).unwrap();
+ let inner_sort = sort_exec(inner_sort_ordering, source);
+
+ // Middle: Aggregate (blocks pushdown from outer sort)
+ // GROUP BY a, COUNT(b)
+ let group_by = PhysicalGroupBy::new_single(vec![(
+ Arc::new(datafusion_physical_expr::expressions::Column::new("a", 0))
as _,
+ "a".to_string(),
+ )]);
+
+ let count_expr = Arc::new(
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![
+
Arc::new(datafusion_physical_expr::expressions::Column::new("b", 1)) as _,
+ ],
+ )
+ .schema(Arc::clone(&schema))
+ .alias("COUNT(b)")
+ .build()
+ .unwrap(),
+ );
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ vec![count_expr],
+ vec![None],
+ inner_sort,
+ Arc::clone(&schema),
+ )
+ .unwrap(),
+ );
+
+ // Outer Sort: [a ASC] - this CANNOT push down through aggregate
+ let outer_sort_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
+ let plan = sort_exec(outer_sort_ordering, aggregate);
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownSort::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)],
ordering_mode=Sorted
+ - SortExec: expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ output:
+ Ok:
+ - SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ - AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)],
ordering_mode=Sorted
+ - SortExec: expr=[a@0 DESC NULLS LAST],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet, reverse_row_groups=true
+ "
+ );
+}
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index 1561ddf440..fa3e860ad3 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -18,7 +18,7 @@
//! Test utilities for physical optimizer tests
use std::any::Any;
-use std::fmt::Formatter;
+use std::fmt::{Display, Formatter};
use std::sync::{Arc, LazyLock};
use arrow::array::Int32Array;
@@ -33,7 +33,9 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
-use datafusion_common::{ColumnStatistics, JoinType, NullEquality, Result,
Statistics};
+use datafusion_common::{
+ internal_err, ColumnStatistics, JoinType, NullEquality, Result, Statistics,
+};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -700,3 +702,75 @@ impl TestAggregate {
}
}
}
+
+/// A harness for testing physical optimizers.
+#[derive(Debug)]
+pub struct OptimizationTest {
+ input: Vec<String>,
+ output: Result<Vec<String>, String>,
+}
+
+impl OptimizationTest {
+ pub fn new<O>(
+ input_plan: Arc<dyn ExecutionPlan>,
+ opt: O,
+ enable_sort_pushdown: bool,
+ ) -> Self
+ where
+ O: PhysicalOptimizerRule,
+ {
+ let input = format_execution_plan(&input_plan);
+ let input_schema = input_plan.schema();
+
+ let mut config = ConfigOptions::new();
+ config.optimizer.enable_sort_pushdown = enable_sort_pushdown;
+ let output_result = opt.optimize(input_plan, &config);
+ let output = output_result
+ .and_then(|plan| {
+ if opt.schema_check() && (plan.schema() != input_schema) {
+ internal_err!(
+ "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}",
+ input_schema,
+ plan.schema()
+ )
+ } else {
+ Ok(plan)
+ }
+ })
+ .map(|plan| format_execution_plan(&plan))
+ .map_err(|e| e.to_string());
+
+ Self { input, output }
+ }
+}
+
+impl Display for OptimizationTest {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ writeln!(f, "OptimizationTest:")?;
+ writeln!(f, " input:")?;
+ for line in &self.input {
+ writeln!(f, " - {line}")?;
+ }
+ writeln!(f, " output:")?;
+ match &self.output {
+ Ok(output) => {
+ writeln!(f, " Ok:")?;
+ for line in output {
+ writeln!(f, " - {line}")?;
+ }
+ }
+ Err(err) => {
+ writeln!(f, " Err: {err}")?;
+ }
+ }
+ Ok(())
+ }
+}
+
+pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
+ format_lines(&displayable(plan.as_ref()).indent(false).to_string())
+}
+
+fn format_lines(s: &str) -> Vec<String> {
+ s.trim().split('\n').map(|s| s.to_string()).collect()
+}
diff --git a/datafusion/datasource-parquet/src/mod.rs
b/datafusion/datasource-parquet/src/mod.rs
index 53ee597bd1..eb4cc9e9ad 100644
--- a/datafusion/datasource-parquet/src/mod.rs
+++ b/datafusion/datasource-parquet/src/mod.rs
@@ -30,6 +30,7 @@ mod page_filter;
mod reader;
mod row_filter;
mod row_group_filter;
+mod sort;
pub mod source;
mod writer;
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 7b4db9e772..f1ecc86ce8 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -51,6 +51,7 @@ use datafusion_physical_plan::metrics::{
};
use datafusion_pruning::{FilePruner, PruningPredicate,
build_pruning_predicate};
+use crate::sort::reverse_row_selection;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
#[cfg(feature = "parquet_encryption")]
@@ -63,7 +64,7 @@ use parquet::arrow::arrow_reader::{
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
-use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
+use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader,
RowGroupMetaData};
/// Implements [`FileOpener`] for a parquet file
pub(super) struct ParquetOpener {
@@ -115,6 +116,60 @@ pub(super) struct ParquetOpener {
/// Maximum size of the predicate cache, in bytes. If none, uses
/// the arrow-rs default.
pub max_predicate_cache_size: Option<usize>,
+ /// Whether to read row groups in reverse order
+ pub reverse_row_groups: bool,
+}
+
+/// Represents a prepared access plan with optional row selection
+struct PreparedAccessPlan {
+ /// Row group indexes to read
+ row_group_indexes: Vec<usize>,
+ /// Optional row selection for filtering within row groups
+ row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
+}
+
+impl PreparedAccessPlan {
+ /// Create a new prepared access plan from a ParquetAccessPlan
+ fn from_access_plan(
+ access_plan: ParquetAccessPlan,
+ rg_metadata: &[RowGroupMetaData],
+ ) -> Result<Self> {
+ let row_group_indexes = access_plan.row_group_indexes();
+ let row_selection =
access_plan.into_overall_row_selection(rg_metadata)?;
+
+ Ok(Self {
+ row_group_indexes,
+ row_selection,
+ })
+ }
+
+ /// Reverse the access plan for reverse scanning
+ fn reverse(
+ mut self,
+ file_metadata: &parquet::file::metadata::ParquetMetaData,
+ ) -> Result<Self> {
+ // Reverse the row group indexes
+ self.row_group_indexes =
self.row_group_indexes.into_iter().rev().collect();
+
+ // If we have a row selection, reverse it to match the new row group
order
+ if let Some(row_selection) = self.row_selection {
+ self.row_selection =
+ Some(reverse_row_selection(&row_selection, file_metadata)?);
+ }
+
+ Ok(self)
+ }
+
+ /// Apply this access plan to a ParquetRecordBatchStreamBuilder
+ fn apply_to_builder(
+ self,
+ mut builder: ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>>,
+ ) -> ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>> {
+ if let Some(row_selection) = self.row_selection {
+ builder = builder.with_row_selection(row_selection);
+ }
+ builder.with_row_groups(self.row_group_indexes)
+ }
}
impl FileOpener for ParquetOpener {
@@ -212,6 +267,7 @@ impl FileOpener for ParquetOpener {
let encryption_context = self.get_encryption_context();
let max_predicate_cache_size = self.max_predicate_cache_size;
+ let reverse_row_groups = self.reverse_row_groups;
Ok(Box::pin(async move {
#[cfg(feature = "parquet_encryption")]
let file_decryption_properties = encryption_context
@@ -479,13 +535,18 @@ impl FileOpener for ParquetOpener {
);
}
- let row_group_indexes = access_plan.row_group_indexes();
- if let Some(row_selection) =
- access_plan.into_overall_row_selection(rg_metadata)?
- {
- builder = builder.with_row_selection(row_selection);
+ // Prepare the access plan (extract row groups and row selection)
+ let mut prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan,
rg_metadata)?;
+
+ // If reverse scanning is enabled, reverse the prepared plan
+ if reverse_row_groups {
+ prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
}
+ // Apply the prepared plan to the builder
+ builder = prepared_plan.apply_to_builder(builder);
+
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
@@ -500,7 +561,6 @@ impl FileOpener for ParquetOpener {
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
- .with_row_groups(row_group_indexes)
.with_metrics(arrow_reader_metrics.clone())
.build()?;
@@ -904,6 +964,7 @@ mod test {
use std::sync::Arc;
use super::{ConstantColumns, constant_columns_from_stats};
+ use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion_common::{
@@ -925,8 +986,7 @@ mod test {
use futures::{Stream, StreamExt};
use object_store::{ObjectStore, memory::InMemory, path::Path};
use parquet::arrow::ArrowWriter;
-
- use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
+ use parquet::file::properties::WriterProperties;
fn constant_int_stats() -> (Statistics, SchemaRef) {
let schema = Arc::new(Schema::new(vec![
@@ -1028,16 +1088,54 @@ mod test {
(num_batches, num_rows)
}
+ /// Helper to collect all int32 values from the first column of batches
+ async fn collect_int32_values(
+ mut stream: std::pin::Pin<
+ Box<
+ dyn Stream<Item = Result<arrow::array::RecordBatch,
DataFusionError>>
+ + Send,
+ >,
+ >,
+ ) -> Vec<i32> {
+ use arrow::array::Array;
+ let mut values = vec![];
+ while let Some(Ok(batch)) = stream.next().await {
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow::array::Int32Array>()
+ .unwrap();
+ for i in 0..array.len() {
+ if !array.is_null(i) {
+ values.push(array.value(i));
+ }
+ }
+ }
+ values
+ }
+
async fn write_parquet(
store: Arc<dyn ObjectStore>,
filename: &str,
batch: arrow::record_batch::RecordBatch,
+ ) -> usize {
+ write_parquet_batches(store, filename, vec![batch], None).await
+ }
+
+ /// Write multiple batches to a parquet file with optional writer
properties
+ async fn write_parquet_batches(
+ store: Arc<dyn ObjectStore>,
+ filename: &str,
+ batches: Vec<arrow::record_batch::RecordBatch>,
+ props: Option<WriterProperties>,
) -> usize {
let mut out = BytesMut::new().writer();
{
- let mut writer =
- ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap();
- writer.write(&batch).unwrap();
+ let schema = batches[0].schema();
+ let mut writer = ArrowWriter::try_new(&mut out, schema,
props).unwrap();
+ for batch in batches {
+ writer.write(&batch).unwrap();
+ }
writer.finish().unwrap();
}
let data = out.into_inner().freeze();
@@ -1108,6 +1206,7 @@ mod test {
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
max_predicate_cache_size: None,
+ reverse_row_groups: false,
}
};
@@ -1179,6 +1278,7 @@ mod test {
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
max_predicate_cache_size: None,
+ reverse_row_groups: false,
}
};
@@ -1266,6 +1366,7 @@ mod test {
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
max_predicate_cache_size: None,
+ reverse_row_groups: false,
}
};
@@ -1356,6 +1457,7 @@ mod test {
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
max_predicate_cache_size: None,
+ reverse_row_groups: false,
}
};
@@ -1454,6 +1556,7 @@ mod test {
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
max_predicate_cache_size: None,
+ reverse_row_groups: false,
}
};
@@ -1495,4 +1598,252 @@ mod test {
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
+
+ #[tokio::test]
+ async fn test_reverse_scan_row_groups() {
+ use parquet::file::properties::WriterProperties;
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create multiple batches to ensure multiple row groups
+ let batch1 =
+ record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(3)])).unwrap();
+ let batch2 =
+ record_batch!(("a", Int32, vec![Some(4), Some(5),
Some(6)])).unwrap();
+ let batch3 =
+ record_batch!(("a", Int32, vec![Some(7), Some(8),
Some(9)])).unwrap();
+
+ // Write parquet file with multiple row groups
+ // Force small row groups by setting max_row_group_size
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(3) // Force each batch into its own row
group
+ .build();
+
+ let data_len = write_parquet_batches(
+ Arc::clone(&store),
+ "test.parquet",
+ vec![batch1.clone(), batch2, batch3],
+ Some(props),
+ )
+ .await;
+
+ let schema = batch1.schema();
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ );
+
+ let make_opener = |reverse_scan: bool| ParquetOpener {
+ partition_index: 0,
+ projection: ProjectionExprs::from_indices(&[0], &schema),
+ batch_size: 1024,
+ limit: None,
+ predicate: None,
+ table_schema: TableSchema::from_file_schema(Arc::clone(&schema)),
+ metadata_size_hint: None,
+ metrics: ExecutionPlanMetricsSet::new(),
+ parquet_file_reader_factory:
Arc::new(DefaultParquetFileReaderFactory::new(
+ Arc::clone(&store),
+ )),
+ pushdown_filters: false,
+ reorder_filters: false,
+ force_filter_selections: false,
+ enable_page_index: false,
+ enable_bloom_filter: false,
+ enable_row_group_stats_pruning: false,
+ coerce_int96: None,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: None,
+ expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
+ max_predicate_cache_size: None,
+ reverse_row_groups: reverse_scan,
+ };
+
+ // Test normal scan (forward)
+ let opener = make_opener(false);
+ let stream = opener.open(file.clone()).unwrap().await.unwrap();
+ let forward_values = collect_int32_values(stream).await;
+
+ // Test reverse scan
+ let opener = make_opener(true);
+ let stream = opener.open(file.clone()).unwrap().await.unwrap();
+ let reverse_values = collect_int32_values(stream).await;
+
+ // The forward scan should return data in the order written
+ assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
+
+ // With reverse scan, row groups are reversed, so we expect:
+ // Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1
(1,2,3)
+ assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
+ }
+
+ #[tokio::test]
+ async fn test_reverse_scan_single_row_group() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create a single batch (single row group)
+ let batch = record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(3)])).unwrap();
+ let data_size =
+ write_parquet(Arc::clone(&store), "test.parquet",
batch.clone()).await;
+
+ let schema = batch.schema();
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_size).unwrap(),
+ );
+
+ let make_opener = |reverse_scan: bool| ParquetOpener {
+ partition_index: 0,
+ projection: ProjectionExprs::from_indices(&[0], &schema),
+ batch_size: 1024,
+ limit: None,
+ predicate: None,
+ table_schema: TableSchema::from_file_schema(Arc::clone(&schema)),
+ metadata_size_hint: None,
+ metrics: ExecutionPlanMetricsSet::new(),
+ parquet_file_reader_factory:
Arc::new(DefaultParquetFileReaderFactory::new(
+ Arc::clone(&store),
+ )),
+ pushdown_filters: false,
+ reorder_filters: false,
+ force_filter_selections: false,
+ enable_page_index: false,
+ enable_bloom_filter: false,
+ enable_row_group_stats_pruning: false,
+ coerce_int96: None,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: None,
+ expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
+ max_predicate_cache_size: None,
+ reverse_row_groups: reverse_scan,
+ };
+
+ // With a single row group, forward and reverse should be the same
+ // (only the row group order is reversed, not the rows within)
+ let opener_forward = make_opener(false);
+ let stream_forward =
opener_forward.open(file.clone()).unwrap().await.unwrap();
+ let (batches_forward, _) =
count_batches_and_rows(stream_forward).await;
+
+ let opener_reverse = make_opener(true);
+ let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap();
+ let (batches_reverse, _) =
count_batches_and_rows(stream_reverse).await;
+
+ // Both should have the same number of batches since there's only one
row group
+ assert_eq!(batches_forward, batches_reverse);
+ }
+
+ #[tokio::test]
+ async fn test_reverse_scan_with_row_selection() {
+ use parquet::file::properties::WriterProperties;
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create 3 batches with DIFFERENT selection patterns
+ let batch1 =
+ record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3),
Some(4)]))
+ .unwrap(); // 4 rows
+ let batch2 =
+ record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7),
Some(8)]))
+ .unwrap(); // 4 rows
+ let batch3 =
+ record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11),
Some(12)]))
+ .unwrap(); // 4 rows
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(4)
+ .build();
+
+ let data_len = write_parquet_batches(
+ Arc::clone(&store),
+ "test.parquet",
+ vec![batch1.clone(), batch2, batch3],
+ Some(props),
+ )
+ .await;
+
+ let schema = batch1.schema();
+
+ use crate::ParquetAccessPlan;
+ use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+ // Row group 0: skip first 2, select last 2 (should get: 3, 4)
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::skip(2),
RowSelector::select(2)]),
+ );
+ // Row group 1: select all (should get: 5, 6, 7, 8)
+ // Row group 2: select first 2, skip last 2 (should get: 9, 10)
+ access_plan.scan_selection(
+ 2,
+ RowSelection::from(vec![RowSelector::select(2),
RowSelector::skip(2)]),
+ );
+
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ )
+ .with_extensions(Arc::new(access_plan));
+
+ let make_opener = |reverse_scan: bool| ParquetOpener {
+ partition_index: 0,
+ projection: ProjectionExprs::from_indices(&[0], &schema),
+ batch_size: 1024,
+ limit: None,
+ predicate: None,
+ table_schema: TableSchema::from_file_schema(Arc::clone(&schema)),
+ metadata_size_hint: None,
+ metrics: ExecutionPlanMetricsSet::new(),
+ parquet_file_reader_factory:
Arc::new(DefaultParquetFileReaderFactory::new(
+ Arc::clone(&store),
+ )),
+ pushdown_filters: false,
+ reorder_filters: false,
+ force_filter_selections: false,
+ enable_page_index: false,
+ enable_bloom_filter: false,
+ enable_row_group_stats_pruning: false,
+ coerce_int96: None,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: None,
+ expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
+ max_predicate_cache_size: None,
+ reverse_row_groups: reverse_scan,
+ };
+
+ // Forward scan: RG0(3,4), RG1(5,6,7,8), RG2(9,10)
+ let opener = make_opener(false);
+ let stream = opener.open(file.clone()).unwrap().await.unwrap();
+ let forward_values = collect_int32_values(stream).await;
+
+ // Forward scan should produce: RG0(3,4), RG1(5,6,7,8), RG2(9,10)
+ assert_eq!(
+ forward_values,
+ vec![3, 4, 5, 6, 7, 8, 9, 10],
+ "Forward scan should select correct rows based on RowSelection"
+ );
+
+ // Reverse scan
+ // CORRECT behavior: reverse row groups AND their corresponding
selections
+ // - RG2 is read first, WITH RG2's selection (select 2, skip 2) -> 9,
10
+ // - RG1 is read second, WITH RG1's selection (select all) -> 5, 6, 7,
8
+ // - RG0 is read third, WITH RG0's selection (skip 2, select 2) -> 3, 4
+ let opener = make_opener(true);
+ let stream = opener.open(file).unwrap().await.unwrap();
+ let reverse_values = collect_int32_values(stream).await;
+
+ // Correct expected result: row groups reversed but each keeps its own
selection
+ // RG2 with its selection (9,10), RG1 with its selection (5,6,7,8),
RG0 with its selection (3,4)
+ assert_eq!(
+ reverse_values,
+ vec![9, 10, 5, 6, 7, 8, 3, 4],
+ "Reverse scan should reverse row group order while maintaining
correct RowSelection for each group"
+ );
+ }
}
diff --git a/datafusion/datasource-parquet/src/sort.rs
b/datafusion/datasource-parquet/src/sort.rs
new file mode 100644
index 0000000000..4255d4d696
--- /dev/null
+++ b/datafusion/datasource-parquet/src/sort.rs
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort-related utilities for Parquet scanning
+
+use datafusion_common::Result;
+use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+use parquet::file::metadata::ParquetMetaData;
+use std::collections::HashMap;
+
+/// Reverse a row selection to match reversed row group order.
+///
+/// When scanning row groups in reverse order, we need to adjust the row
selection
+/// to account for the new ordering. This function:
+/// 1. Maps each selection to its corresponding row group
+/// 2. Reverses the order of row groups
+/// 3. Reconstructs the row selection for the new order
+///
+/// # Arguments
+/// * `row_selection` - Original row selection
+/// * `parquet_metadata` - Metadata containing row group information
+///
+/// # Returns
+/// A new `RowSelection` adjusted for reversed row group order
+pub fn reverse_row_selection(
+ row_selection: &RowSelection,
+ parquet_metadata: &ParquetMetaData,
+) -> Result<RowSelection> {
+ let rg_metadata = parquet_metadata.row_groups();
+
+ // Build a mapping of row group index to its row range in the file
+ let mut rg_row_ranges: Vec<(usize, usize, usize)> =
+ Vec::with_capacity(rg_metadata.len());
+ let mut current_row = 0;
+ for (rg_idx, rg) in rg_metadata.iter().enumerate() {
+ let num_rows = rg.num_rows() as usize;
+ rg_row_ranges.push((rg_idx, current_row, current_row + num_rows));
+ current_row += num_rows;
+ }
+
+ // Map selections to row groups
+ let mut rg_selections: HashMap<usize, Vec<RowSelector>> = HashMap::new();
+
+ let mut current_file_row = 0;
+ for selector in row_selection.iter() {
+ let selector_end = current_file_row + selector.row_count;
+
+ // Find which row groups this selector spans
+ for (rg_idx, rg_start, rg_end) in rg_row_ranges.iter() {
+ if current_file_row < *rg_end && selector_end > *rg_start {
+ // This selector overlaps with this row group
+ let overlap_start = current_file_row.max(*rg_start);
+ let overlap_end = selector_end.min(*rg_end);
+ let overlap_count = overlap_end - overlap_start;
+
+ if overlap_count > 0 {
+ let entry = rg_selections.entry(*rg_idx).or_default();
+ if selector.skip {
+ entry.push(RowSelector::skip(overlap_count));
+ } else {
+ entry.push(RowSelector::select(overlap_count));
+ }
+ }
+ }
+ }
+
+ current_file_row = selector_end;
+ }
+
+ // Build new selection for reversed row group order
+ let mut reversed_selectors = Vec::new();
+ for rg_idx in (0..rg_metadata.len()).rev() {
+ if let Some(selectors) = rg_selections.get(&rg_idx) {
+ reversed_selectors.extend(selectors.iter().cloned());
+ } else {
+ // No specific selection for this row group means select all
+ if let Some((_, start, end)) =
+ rg_row_ranges.iter().find(|(idx, _, _)| *idx == rg_idx)
+ {
+ reversed_selectors.push(RowSelector::select(end - start));
+ }
+ }
+ }
+
+ Ok(RowSelection::from(reversed_selectors))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use bytes::Bytes;
+ use parquet::arrow::ArrowWriter;
+ use parquet::file::reader::FileReader;
+ use parquet::file::serialized_reader::SerializedFileReader;
+ use std::sync::Arc;
+
+ /// Helper function to create a ParquetMetaData with specified row group
sizes
+ /// by actually writing a parquet file in memory
+ fn create_test_metadata(row_group_sizes: Vec<i64>) -> ParquetMetaData {
+ // Create a simple schema
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+
+ // Create in-memory parquet file with the specified row groups
+ let mut buffer = Vec::new();
+ {
+ let props = parquet::file::properties::WriterProperties::builder()
+ .set_max_row_group_size(row_group_sizes[0] as usize)
+ .build();
+
+ let mut writer =
+ ArrowWriter::try_new(&mut buffer, schema.clone(),
Some(props)).unwrap();
+
+ for &size in &row_group_sizes {
+ // Create a batch with the specified number of rows
+ let array = arrow::array::Int32Array::from(vec![1; size as
usize]);
+ let batch = arrow::record_batch::RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(array)],
+ )
+ .unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+ }
+
+ // Read back the metadata
+ let bytes = Bytes::from(buffer);
+ let reader = SerializedFileReader::new(bytes).unwrap();
+ reader.metadata().clone()
+ }
+
+ #[test]
+ fn test_reverse_simple_selection() {
+ // 3 row groups with 100 rows each
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ // Select first 50 rows from first row group
+ let selection =
+ RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(250)]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ // Verify total selected rows remain the same
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ }
+
+ #[test]
+ fn test_reverse_multi_row_group_selection() {
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ // Select rows spanning multiple row groups
+ let selection = RowSelection::from(vec![
+ RowSelector::skip(50),
+ RowSelector::select(100), // Spans RG0 and RG1
+ RowSelector::skip(150),
+ ]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ // Verify total selected rows remain the same
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ }
+
+ #[test]
+ fn test_reverse_full_selection() {
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ // Select all rows
+ let selection = RowSelection::from(vec![RowSelector::select(300)]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ // Should still select all rows, just in reversed row group order
+ let total_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(total_selected, 300);
+ }
+
+ #[test]
+ fn test_reverse_empty_selection() {
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ // Skip all rows
+ let selection = RowSelection::from(vec![RowSelector::skip(300)]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ // Should still skip all rows
+ let total_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(total_selected, 0);
+ }
+
+ #[test]
+ fn test_reverse_with_different_row_group_sizes() {
+ let metadata = create_test_metadata(vec![50, 150, 100]);
+
+ let selection = RowSelection::from(vec![
+ RowSelector::skip(25),
+ RowSelector::select(200), // Spans all row groups
+ RowSelector::skip(75),
+ ]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ }
+
+ #[test]
+ fn test_reverse_single_row_group() {
+ let metadata = create_test_metadata(vec![100]);
+
+ let selection =
+ RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(50)]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ // With single row group, selection should remain the same
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ }
+
+ #[test]
+ fn test_reverse_complex_pattern() {
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ // Complex pattern: select some, skip some, select some more
+ let selection = RowSelection::from(vec![
+ RowSelector::select(30),
+ RowSelector::skip(40),
+ RowSelector::select(80),
+ RowSelector::skip(50),
+ RowSelector::select(100),
+ ]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ assert_eq!(original_selected, 210); // 30 + 80 + 100
+ }
+
+ #[test]
+ fn test_reverse_with_skipped_row_group() {
+ // This test covers the "no specific selection" code path (lines 90-95)
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ // Select only from first and third row groups, skip middle one
entirely
+ let selection = RowSelection::from(vec![
+ RowSelector::select(50), // First 50 of RG0
+ RowSelector::skip(150), // Rest of RG0 + all of RG1 + half of RG2
+ RowSelector::select(50), // Last 50 of RG2
+ ]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ // Verify total selected rows remain the same
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ assert_eq!(original_selected, 100); // 50 + 50
+ }
+
+ #[test]
+ fn test_reverse_middle_row_group_only() {
+ // Another test to ensure skipped row groups are handled correctly
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ // Select only middle row group
+ let selection = RowSelection::from(vec![
+ RowSelector::skip(100), // Skip RG0
+ RowSelector::select(100), // Select all of RG1
+ RowSelector::skip(100), // Skip RG2
+ ]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ assert_eq!(original_selected, 100);
+ }
+
+ #[test]
+ fn test_reverse_alternating_row_groups() {
+ // Test with more complex skipping pattern
+ let metadata = create_test_metadata(vec![100, 100, 100, 100]);
+
+ // Select first and third row groups, skip second and fourth
+ let selection = RowSelection::from(vec![
+ RowSelector::select(100), // RG0
+ RowSelector::skip(100), // RG1
+ RowSelector::select(100), // RG2
+ RowSelector::skip(100), // RG3
+ ]);
+
+ let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+
+ let original_selected: usize = selection
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+ let reversed_selected: usize = reversed
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(original_selected, reversed_selected);
+ assert_eq!(original_selected, 200);
+ }
+}
diff --git a/datafusion/datasource-parquet/src/source.rs
b/datafusion/datasource-parquet/src/source.rs
index 5caaa1c474..4956f83eff 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -44,6 +44,7 @@ use
datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_plan::DisplayFormatType;
+use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PushedDownPredicate,
@@ -53,6 +54,7 @@ use
datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use itertools::Itertools;
use object_store::ObjectStore;
#[cfg(feature = "parquet_encryption")]
@@ -287,6 +289,14 @@ pub struct ParquetSource {
pub(crate) projection: ProjectionExprs,
#[cfg(feature = "parquet_encryption")]
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
+ /// The ordering of data within the files
+ /// This is set by FileScanConfig when it knows the file ordering
+ file_ordering: Option<LexOrdering>,
+ /// If true, read files in reverse order and reverse row groups within
files.
+ /// But it's not guaranteed that rows within row groups are in reverse
order,
+ /// so we still need to sort them after reading, so the reverse scan is
inexact.
+ /// Used to optimize ORDER BY ... DESC on sorted data.
+ reverse_row_groups: bool,
}
impl ParquetSource {
@@ -311,6 +321,8 @@ impl ParquetSource {
metadata_size_hint: None,
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
+ file_ordering: None,
+ reverse_row_groups: false,
}
}
@@ -386,6 +398,12 @@ impl ParquetSource {
self
}
+ /// If set, indicates the ordering of data within the files being read.
+ pub fn with_file_ordering(mut self, ordering: Option<LexOrdering>) -> Self
{
+ self.file_ordering = ordering;
+ self
+ }
+
/// Return the value described in [`Self::with_pushdown_filters`]
pub(crate) fn pushdown_filters(&self) -> bool {
self.table_parquet_options.global.pushdown_filters
@@ -465,6 +483,15 @@ impl ParquetSource {
)),
}
}
+
+ pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool)
-> Self {
+ self.reverse_row_groups = reverse_row_groups;
+ self
+ }
+ #[cfg(test)]
+ pub(crate) fn reverse_row_groups(&self) -> bool {
+ self.reverse_row_groups
+ }
}
/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a
arrow_schema.datatype.TimeUnit
@@ -550,6 +577,7 @@ impl FileSource for ParquetSource {
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
+ reverse_row_groups: self.reverse_row_groups,
});
Ok(opener)
}
@@ -603,6 +631,11 @@ impl FileSource for ParquetSource {
write!(f, "{predicate_string}")?;
+ // Add reverse_scan info if enabled
+ if self.reverse_row_groups {
+ write!(f, ", reverse_row_groups=true")?;
+ }
+
// Try to build a the pruning predicates.
// These are only generated here because it's useful to have
*some*
// idea of what pushdown is happening when viewing plans.
@@ -710,6 +743,72 @@ impl FileSource for ParquetSource {
)
.with_updated_node(source))
}
+
+ /// Try to optimize the scan to produce data in the requested sort order.
+ ///
+ /// This method receives:
+ /// 1. The query's required ordering (`order` parameter)
+ /// 2. The file's natural ordering (via `self.file_ordering`, set by
FileScanConfig)
+ ///
+ /// With both pieces of information, ParquetSource can decide what
optimizations to apply.
+ ///
+ /// # Phase 1 Behavior (Current)
+ /// Returns `Inexact` when reversing the row group scan order would help
satisfy the
+ /// requested ordering. We still need a Sort operator at a higher level
because:
+ /// - We only reverse row group read order, not rows within row groups
+ /// - This provides approximate ordering that benefits limit pushdown
+ ///
+ /// # Phase 2 (Future)
+ /// Could return `Exact` when we can guarantee perfect ordering through
techniques like:
+ /// - File reordering based on statistics
+ /// - Detecting already-sorted data
+ /// This would allow removing the Sort operator entirely.
+ ///
+ /// # Returns
+ /// - `Inexact`: Created an optimized source (e.g., reversed scan) that
approximates the order
+ /// - `Unsupported`: Cannot optimize for this ordering
+ fn try_reverse_output(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn
FileSource>>> {
+ // Check if we have file ordering information
+ let file_ordering = match &self.file_ordering {
+ Some(ordering) => ordering,
+ None => return Ok(SortOrderPushdownResult::Unsupported),
+ };
+
+ // Create a LexOrdering from the requested order to use the is_reverse
method
+ let Some(requested_ordering) = LexOrdering::new(order.to_vec()) else {
+ // Empty ordering requested, cannot optimize
+ return Ok(SortOrderPushdownResult::Unsupported);
+ };
+
+ // Check if reversing the file ordering would satisfy the requested
ordering
+ if file_ordering.is_reverse(&requested_ordering) {
+ // Phase 1: Enable reverse row group scanning
+ let new_source = self.clone().with_reverse_row_groups(true);
+
+ // Return Inexact because we're only reversing row group order,
+ // not guaranteeing perfect row-level ordering
+ return Ok(SortOrderPushdownResult::Inexact {
+ inner: Arc::new(new_source) as Arc<dyn FileSource>,
+ });
+ }
+
+ // TODO Phase 2: Add support for other optimizations:
+ // - File reordering based on min/max statistics
+ // - Detection of exact ordering (return Exact to remove Sort operator)
+ // - Partial sort pushdown for prefix matches
+
+ Ok(SortOrderPushdownResult::Unsupported)
+ }
+
+ fn with_file_ordering_info(
+ &self,
+ ordering: Option<LexOrdering>,
+ ) -> datafusion_common::Result<Arc<dyn FileSource>> {
+ Ok(Arc::new(self.clone().with_file_ordering(ordering)))
+ }
}
#[cfg(test)]
@@ -728,4 +827,87 @@ mod tests {
// same value. but filter() call Arc::clone internally
assert_eq!(parquet_source.predicate(),
parquet_source.filter().as_ref());
}
+
+ #[test]
+ fn test_reverse_scan_default_value() {
+ use arrow::datatypes::Schema;
+
+ let schema = Arc::new(Schema::empty());
+ let source = ParquetSource::new(schema);
+
+ assert!(!source.reverse_row_groups());
+ }
+
+ #[test]
+ fn test_reverse_scan_with_setter() {
+ use arrow::datatypes::Schema;
+
+ let schema = Arc::new(Schema::empty());
+
+ let source =
ParquetSource::new(schema.clone()).with_reverse_row_groups(true);
+ assert!(source.reverse_row_groups());
+
+ let source = source.with_reverse_row_groups(false);
+ assert!(!source.reverse_row_groups());
+ }
+
+ #[test]
+ fn test_reverse_scan_clone_preserves_value() {
+ use arrow::datatypes::Schema;
+
+ let schema = Arc::new(Schema::empty());
+
+ let source = ParquetSource::new(schema).with_reverse_row_groups(true);
+ let cloned = source.clone();
+
+ assert!(cloned.reverse_row_groups());
+ assert_eq!(source.reverse_row_groups(), cloned.reverse_row_groups());
+ }
+
+ #[test]
+ fn test_reverse_scan_with_other_options() {
+ use arrow::datatypes::Schema;
+ use datafusion_common::config::TableParquetOptions;
+
+ let schema = Arc::new(Schema::empty());
+ let options = TableParquetOptions::default();
+
+ let source = ParquetSource::new(schema)
+ .with_table_parquet_options(options)
+ .with_metadata_size_hint(8192)
+ .with_reverse_row_groups(true);
+
+ assert!(source.reverse_row_groups());
+ assert_eq!(source.metadata_size_hint, Some(8192));
+ }
+
+ #[test]
+ fn test_reverse_scan_builder_pattern() {
+ use arrow::datatypes::Schema;
+
+ let schema = Arc::new(Schema::empty());
+
+ let source = ParquetSource::new(schema)
+ .with_reverse_row_groups(true)
+ .with_reverse_row_groups(false)
+ .with_reverse_row_groups(true);
+
+ assert!(source.reverse_row_groups());
+ }
+
+ #[test]
+ fn test_reverse_scan_independent_of_predicate() {
+ use arrow::datatypes::Schema;
+ use datafusion_physical_expr::expressions::lit;
+
+ let schema = Arc::new(Schema::empty());
+ let predicate = lit(true);
+
+ let source = ParquetSource::new(schema)
+ .with_predicate(predicate)
+ .with_reverse_row_groups(true);
+
+ assert!(source.reverse_row_groups());
+ assert!(source.filter().is_some());
+ }
}
diff --git a/datafusion/datasource/src/file.rs
b/datafusion/datasource/src/file.rs
index e25a2e889e..2c69987f91 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -31,9 +31,11 @@ use datafusion_common::{Result, not_impl_err};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_plan::DisplayFormatType;
+use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation,
PushedDown};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use object_store::ObjectStore;
/// Helper function to convert any type implementing FileSource to Arc<dyn
FileSource>
@@ -129,6 +131,21 @@ pub trait FileSource: Send + Sync {
))
}
+ /// Try to create a new FileSource that can produce data in the specified
sort order.
+ ///
+ /// # Returns
+ /// * `Exact` - Created a source that guarantees perfect ordering
+ /// * `Inexact` - Created a source optimized for ordering (e.g., reordered
files) but not perfectly sorted
+ /// * `Unsupported` - Cannot optimize for this ordering
+ ///
+ /// Default implementation returns `Unsupported`.
+ fn try_reverse_output(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
+ Ok(SortOrderPushdownResult::Unsupported)
+ }
+
/// Try to push down a projection into a this FileSource.
///
/// `FileSource` implementations that support projection pushdown should
@@ -183,4 +200,22 @@ pub trait FileSource: Send + Sync {
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
None
}
+
+ /// Set the file ordering information
+ ///
+ /// This allows the file source to know how the files are sorted,
+ /// enabling it to make informed decisions about sort pushdown.
+ ///
+ /// # Default Implementation
+ ///
+ /// Returns `not_impl_err!`. FileSource implementations that support
+ /// sort optimization should override this method.
+ fn with_file_ordering_info(
+ &self,
+ _ordering: Option<LexOrdering>,
+ ) -> Result<Arc<dyn FileSource>> {
+ // Default: clone self without modification
+ // ParquetSource will override this
+ not_impl_err!("with_file_ordering_info not implemented for this
FileSource")
+ }
}
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index ad89406014..16a010cf27 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -34,25 +34,26 @@ use datafusion_execution::{
SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
};
use datafusion_expr::Operator;
+
+use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning,
split_conjunction};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
+use datafusion_physical_plan::SortOrderPushdownResult;
+use datafusion_physical_plan::coop::cooperative;
+use datafusion_physical_plan::execution_plan::SchedulingType;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType,
display::{ProjectSchemaDisplay, display_orderings},
filter_pushdown::FilterPushdownPropagation,
metrics::ExecutionPlanMetricsSet,
};
-use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult,
sync::Arc};
-
-use datafusion_physical_expr::equivalence::project_orderings;
-use datafusion_physical_plan::coop::cooperative;
-use datafusion_physical_plan::execution_plan::SchedulingType;
use log::{debug, warn};
+use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult,
sync::Arc};
/// The base configurations for a [`DataSourceExec`], the a physical plan for
/// any given file format.
@@ -845,6 +846,45 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
+ let file_ordering = self.output_ordering.first().cloned();
+
+ if file_ordering.is_none() {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+
+ // Use the trait method instead of downcasting
+ // Try to provide file ordering info to the source
+ // If not supported (e.g., CsvSource), fall back to original source
+ let file_source_with_ordering = self
+ .file_source
+ .with_file_ordering_info(file_ordering)
+ .unwrap_or_else(|_| Arc::clone(&self.file_source));
+
+ // Try to reverse the datasource with ordering info,
+ // and currently only ParquetSource supports it with inexact reverse
with row groups.
+ let pushdown_result =
file_source_with_ordering.try_reverse_output(order)?;
+
+ match pushdown_result {
+ SortOrderPushdownResult::Exact { inner } => {
+ Ok(SortOrderPushdownResult::Exact {
+ inner: self.rebuild_with_source(inner, true)?,
+ })
+ }
+ SortOrderPushdownResult::Inexact { inner } => {
+ Ok(SortOrderPushdownResult::Inexact {
+ inner: self.rebuild_with_source(inner, false)?,
+ })
+ }
+ SortOrderPushdownResult::Unsupported => {
+ Ok(SortOrderPushdownResult::Unsupported)
+ }
+ }
+ }
}
impl FileScanConfig {
@@ -1107,6 +1147,36 @@ impl FileScanConfig {
pub fn file_source(&self) -> &Arc<dyn FileSource> {
&self.file_source
}
+
+ /// Helper: Rebuild FileScanConfig with new file source
+ fn rebuild_with_source(
+ &self,
+ new_file_source: Arc<dyn FileSource>,
+ is_exact: bool,
+ ) -> Result<Arc<dyn DataSource>> {
+ let mut new_config = self.clone();
+
+ // Reverse file groups (FileScanConfig's responsibility)
+ new_config.file_groups = new_config
+ .file_groups
+ .into_iter()
+ .map(|group| {
+ let mut files = group.into_inner();
+ files.reverse();
+ files.into()
+ })
+ .collect();
+
+ new_config.file_source = new_file_source;
+
+ // Phase 1: Clear output_ordering for Inexact
+ // (we're only reversing row groups, not guaranteeing perfect ordering)
+ if !is_exact {
+ new_config.output_ordering = vec![];
+ }
+
+ Ok(Arc::new(new_config))
+ }
}
impl Debug for FileScanConfig {
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index 0945ffc94c..a3892dfac9 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -40,7 +40,8 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning,
PhysicalExpr};
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
+use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
PushedDown,
};
@@ -190,6 +191,25 @@ pub trait DataSource: Send + Sync + Debug {
vec![PushedDown::No; filters.len()],
))
}
+
+ /// Try to create a new DataSource that produces data in the specified
sort order.
+ ///
+ /// # Arguments
+ /// * `order` - The desired output ordering
+ ///
+ /// # Returns
+ /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that
guarantees exact ordering
+ /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source
optimized for the ordering
+ /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for
this ordering
+ /// * `Err(e)` - Error occurred
+ ///
+ /// Default implementation returns `Unsupported`.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
+ Ok(SortOrderPushdownResult::Unsupported)
+ }
}
/// [`ExecutionPlan`] that reads one or more files
@@ -360,6 +380,19 @@ impl ExecutionPlan for DataSourceExec {
}),
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
+ // Delegate to the data source and wrap result with DataSourceExec
+ self.data_source
+ .try_pushdown_sort(order)?
+ .try_map(|new_data_source| {
+ let new_exec = self.clone().with_data_source(new_data_source);
+ Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
+ })
+ }
}
impl DataSourceExec {
diff --git a/datafusion/execution/src/config.rs
b/datafusion/execution/src/config.rs
index 5dcdab1552..30ba7de76a 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -424,6 +424,13 @@ impl SessionConfig {
self.options.optimizer.enable_round_robin_repartition
}
+ /// Enables or disables sort pushdown optimization, and currently only
+ /// applies to Parquet data source.
+ pub fn with_enable_sort_pushdown(mut self, enabled: bool) -> Self {
+ self.options_mut().optimizer.enable_sort_pushdown = enabled;
+ self
+ }
+
/// Set the size of [`sort_spill_reservation_bytes`] to control
/// memory pre-reservation
///
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs
b/datafusion/physical-expr-common/src/sort_expr.rs
index 1a49db3d58..e8558c7643 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -426,6 +426,62 @@ impl LexOrdering {
self.exprs.truncate(len);
true
}
+
+ /// Check if reversing this ordering would satisfy another ordering
requirement.
+ ///
+ /// This supports **prefix matching**: if this ordering is `[A DESC, B
ASC]`
+ /// and `other` is `[A ASC]`, reversing this gives `[A ASC, B DESC]`, which
+ /// satisfies `other` since `[A ASC]` is a prefix.
+ ///
+ /// # Arguments
+ /// * `other` - The ordering requirement to check against
+ ///
+ /// # Returns
+ /// `true` if reversing this ordering would satisfy `other`
+ ///
+ /// # Example
+ /// ```text
+ /// self: [number DESC, letter ASC]
+ /// other: [number ASC]
+ /// After reversing self: [number ASC, letter DESC] ✓ Prefix match!
+ /// ```
+ pub fn is_reverse(&self, other: &LexOrdering) -> bool {
+ let self_exprs = self.as_ref();
+ let other_exprs = other.as_ref();
+
+ if other_exprs.len() > self_exprs.len() {
+ return false;
+ }
+
+ other_exprs.iter().zip(self_exprs.iter()).all(|(req, cur)| {
+ req.expr.eq(&cur.expr) && is_reversed_sort_options(&req.options,
&cur.options)
+ })
+ }
+}
+
+/// Check if two SortOptions represent reversed orderings.
+///
+/// Returns `true` if both `descending` and `nulls_first` are opposite.
+///
+/// # Example
+/// ```
+/// use arrow::compute::SortOptions;
+/// # use datafusion_physical_expr_common::sort_expr::is_reversed_sort_options;
+///
+/// let asc_nulls_last = SortOptions {
+/// descending: false,
+/// nulls_first: false,
+/// };
+/// let desc_nulls_first = SortOptions {
+/// descending: true,
+/// nulls_first: true,
+/// };
+///
+/// assert!(is_reversed_sort_options(&asc_nulls_last, &desc_nulls_first));
+/// assert!(is_reversed_sort_options(&desc_nulls_first, &asc_nulls_last));
+/// ```
+pub fn is_reversed_sort_options(lhs: &SortOptions, rhs: &SortOptions) -> bool {
+ lhs.descending != rhs.descending && lhs.nulls_first != rhs.nulls_first
}
impl PartialEq for LexOrdering {
@@ -732,3 +788,50 @@ impl DerefMut for OrderingRequirements {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_is_reversed_sort_options() {
+ // Test basic reversal: ASC NULLS LAST ↔ DESC NULLS FIRST
+ let asc_nulls_last = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let desc_nulls_first = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ assert!(is_reversed_sort_options(&asc_nulls_last, &desc_nulls_first));
+ assert!(is_reversed_sort_options(&desc_nulls_first, &asc_nulls_last));
+
+ // Test another reversal: ASC NULLS FIRST ↔ DESC NULLS LAST
+ let asc_nulls_first = SortOptions {
+ descending: false,
+ nulls_first: true,
+ };
+ let desc_nulls_last = SortOptions {
+ descending: true,
+ nulls_first: false,
+ };
+ assert!(is_reversed_sort_options(&asc_nulls_first, &desc_nulls_last));
+ assert!(is_reversed_sort_options(&desc_nulls_last, &asc_nulls_first));
+
+ // Test non-reversal: same options
+ assert!(!is_reversed_sort_options(&asc_nulls_last, &asc_nulls_last));
+ assert!(!is_reversed_sort_options(
+ &desc_nulls_first,
+ &desc_nulls_first
+ ));
+
+ // Test non-reversal: only descending differs
+ assert!(!is_reversed_sort_options(&asc_nulls_last, &desc_nulls_last));
+ assert!(!is_reversed_sort_options(&desc_nulls_last, &asc_nulls_last));
+
+ // Test non-reversal: only nulls_first differs
+ assert!(!is_reversed_sort_options(&asc_nulls_last, &asc_nulls_first));
+ assert!(!is_reversed_sort_options(&asc_nulls_first, &asc_nulls_last));
+ }
+}
diff --git a/datafusion/physical-optimizer/src/lib.rs
b/datafusion/physical-optimizer/src/lib.rs
index cf8cf33cea..1b45f02ebd 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -42,6 +42,7 @@ pub mod optimizer;
pub mod output_requirements;
pub mod projection_pushdown;
pub use datafusion_pruning as pruning;
+pub mod pushdown_sort;
pub mod sanity_checker;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
diff --git a/datafusion/physical-optimizer/src/optimizer.rs
b/datafusion/physical-optimizer/src/optimizer.rs
index f8e2e9950a..aa1975d98d 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -37,6 +37,7 @@ use crate::topk_aggregation::TopKAggregation;
use crate::update_aggr_exprs::OptimizeAggregateOrder;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
+use crate::pushdown_sort::PushdownSort;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_plan::ExecutionPlan;
@@ -145,6 +146,8 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union
will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
+ // PushdownSort: Detect sorts that can be pushed down to data
sources.
+ Arc::new(PushdownSort::new()),
Arc::new(EnsureCooperative::new()),
// This FilterPushdown handles dynamic filters that may have
references to the source ExecutionPlan.
// Therefore it should be run at the end of the optimization
process since any changes to the plan may break the dynamic filter's references.
diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs
b/datafusion/physical-optimizer/src/pushdown_sort.rs
new file mode 100644
index 0000000000..1fa15492d2
--- /dev/null
+++ b/datafusion/physical-optimizer/src/pushdown_sort.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort Pushdown Optimization
+//!
+//! This optimizer attempts to push sort requirements down through the
execution plan
+//! tree to data sources that can natively handle them (e.g., by scanning
files in
+//! reverse order).
+//!
+//! ## How it works
+//!
+//! 1. Detects `SortExec` nodes in the plan
+//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort
requirement
+//! 3. Each node type defines its own pushdown behavior:
+//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.)
delegate to
+//! their children and wrap the result
+//! - **Data sources** (DataSourceExec) check if they can optimize for the
ordering
+//! - **Blocking nodes** return `Unsupported` to stop pushdown
+//! 4. Based on the result:
+//! - `Exact`: Remove the Sort operator (data source guarantees perfect
ordering)
+//! - `Inexact`: Keep Sort but use optimized input (enables early
termination for TopK)
+//! - `Unsupported`: No change
+//!
+//! ## Current capabilities (Phase 1)
+//!
+//! - Reverse scan optimization: when required sort is the reverse of the data
source's
+//! natural ordering, enable reverse scanning (reading row groups in reverse
order)
+//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query
needs
+//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement
+//!
+//! TODO Issue: <https://github.com/apache/datafusion/issues/19329>
+//! ## Future enhancements (Phase 2),
+//!
+//! - File reordering based on statistics
+//! - Return `Exact` when files are known to be perfectly sorted
+//! - Complete Sort elimination when ordering is guaranteed
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::SortOrderPushdownResult;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use std::sync::Arc;
+
+/// A PhysicalOptimizerRule that attempts to push down sort requirements to
data sources.
+///
+/// See module-level documentation for details.
+#[derive(Debug, Clone, Default)]
+pub struct PushdownSort;
+
+impl PushdownSort {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl PhysicalOptimizerRule for PushdownSort {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Check if sort pushdown optimization is enabled
+ if !config.optimizer.enable_sort_pushdown {
+ return Ok(plan);
+ }
+
+ // Use transform_down to find and optimize all SortExec nodes
(including nested ones)
+ plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
+ // Check if this is a SortExec
+ let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>()
else {
+ return Ok(Transformed::no(plan));
+ };
+
+ let sort_input = Arc::clone(sort_exec.input());
+ let required_ordering = sort_exec.expr();
+
+ // Try to push the sort requirement down through the plan tree
+ // Each node type defines its own pushdown behavior via
try_pushdown_sort()
+ match sort_input.try_pushdown_sort(required_ordering)? {
+ SortOrderPushdownResult::Exact { inner } => {
+ // Data source guarantees perfect ordering - remove the
Sort operator
+ Ok(Transformed::yes(inner))
+ }
+ SortOrderPushdownResult::Inexact { inner } => {
+ // Data source is optimized for the ordering but not
perfectly sorted
+ // Keep the Sort operator but use the optimized input
+ // Benefits: TopK queries can terminate early, better
cache locality
+ Ok(Transformed::yes(Arc::new(
+ SortExec::new(required_ordering.clone(), inner)
+ .with_fetch(sort_exec.fetch())
+ .with_preserve_partitioning(
+ sort_exec.preserve_partitioning(),
+ ),
+ )))
+ }
+ SortOrderPushdownResult::Unsupported => {
+ // Cannot optimize for this ordering - no change
+ Ok(Transformed::no(plan))
+ }
+ }
+ })
+ .data()
+ }
+
+ fn name(&self) -> &str {
+ "PushdownSort"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs
b/datafusion/physical-plan/src/coalesce_batches.rs
index eb3c3b5bef..494b5d60fb 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -40,7 +40,9 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
+use crate::sort_pushdown::SortOrderPushdownResult;
use datafusion_common::config::ConfigOptions;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use futures::ready;
use futures::stream::{Stream, StreamExt};
@@ -241,6 +243,20 @@ impl ExecutionPlan for CoalesceBatchesExec {
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
+ // CoalesceBatchesExec is transparent for sort ordering - it preserves
order
+ // Delegate to the child and wrap with a new CoalesceBatchesExec
+ self.input.try_pushdown_sort(order)?.try_map(|new_input| {
+ Ok(Arc::new(
+ CoalesceBatchesExec::new(new_input, self.target_batch_size)
+ .with_fetch(self.fetch),
+ ) as Arc<dyn ExecutionPlan>)
+ })
+ }
}
/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more
details.
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 7f207d7f1e..d83f90eb3d 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -30,7 +30,9 @@ use super::{
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
use crate::projection::{ProjectionExec, make_with_child};
+use crate::sort_pushdown::SortOrderPushdownResult;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
@@ -284,6 +286,42 @@ impl ExecutionPlan for CoalescePartitionsExec {
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
+ // CoalescePartitionsExec merges multiple partitions into one, which
loses
+ // global ordering. However, we can still push the sort requirement
down
+ // to optimize individual partitions - the Sort operator above will
handle
+ // the global ordering.
+ //
+ // Note: The result will always be at most Inexact (never Exact) when
there
+ // are multiple partitions, because merging destroys global ordering.
+ let result = self.input.try_pushdown_sort(order)?;
+
+ // If we have multiple partitions, we can't return Exact even if the
+ // underlying source claims Exact - merging destroys global ordering
+ let has_multiple_partitions =
+ self.input.output_partitioning().partition_count() > 1;
+
+ result
+ .try_map(|new_input| {
+ Ok(
+ Arc::new(
+
CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
+ ) as Arc<dyn ExecutionPlan>,
+ )
+ })
+ .map(|r| {
+ if has_multiple_partitions {
+ // Downgrade Exact to Inexact when merging multiple
partitions
+ r.into_inexact()
+ } else {
+ r
+ }
+ })
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index b7967bb7bb..06da0b8933 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -22,6 +22,7 @@ use crate::filter_pushdown::{
};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
+use crate::sort_pushdown::SortOrderPushdownResult;
pub use crate::stream::EmptyRecordBatchStream;
pub use datafusion_common::hash_utils;
@@ -54,7 +55,9 @@ use datafusion_common::{
use datafusion_common_runtime::JoinSet;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering,
OrderingRequirements};
+use datafusion_physical_expr_common::sort_expr::{
+ LexOrdering, OrderingRequirements, PhysicalSortExpr,
+};
use futures::stream::{StreamExt, TryStreamExt};
@@ -682,6 +685,29 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
+
+ /// Try to push down sort ordering requirements to this node.
+ ///
+ /// This method is called during sort pushdown optimization to determine
if this
+ /// node can optimize for a requested sort ordering. Implementations
should:
+ ///
+ /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee
the exact
+ /// ordering (allowing the Sort operator to be removed)
+ /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize
for the
+ /// ordering but cannot guarantee perfect sorting (Sort operator is kept)
+ /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot
optimize
+ /// for the ordering
+ ///
+ /// For transparent nodes (that preserve ordering), implement this to
delegate to
+ /// children and wrap the result with a new instance of this node.
+ ///
+ /// Default implementation returns `Unsupported`.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
+ Ok(SortOrderPushdownResult::Unsupported)
+ }
}
/// [`ExecutionPlan`] Invariant Level
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index 849b34e703..ec8e154cae 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -50,6 +50,7 @@ pub use crate::execution_plan::{
};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
+pub use crate::sort_pushdown::SortOrderPushdownResult;
pub use crate::stream::EmptyRecordBatchStream;
pub use crate::topk::TopK;
pub use crate::visitor::{ExecutionPlanVisitor, accept, visit_execution_plan};
@@ -83,6 +84,7 @@ pub mod placeholder_row;
pub mod projection;
pub mod recursive_query;
pub mod repartition;
+pub mod sort_pushdown;
pub mod sorts;
pub mod spill;
pub mod stream;
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 9d437dbcf6..5c9472182b 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -61,6 +61,8 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
+use crate::sort_pushdown::SortOrderPushdownResult;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::stream::Stream;
use futures::{FutureExt, StreamExt, TryStreamExt, ready};
@@ -1094,6 +1096,27 @@ impl ExecutionPlan for RepartitionExec {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
+ // RepartitionExec only maintains input order if preserve_order is set
+ // or if there's only one partition
+ if !self.maintains_input_order()[0] {
+ return Ok(SortOrderPushdownResult::Unsupported);
+ }
+
+ // Delegate to the child and wrap with a new RepartitionExec
+ self.input.try_pushdown_sort(order)?.try_map(|new_input| {
+ let mut new_repartition =
+ RepartitionExec::try_new(new_input,
self.partitioning().clone())?;
+ if self.preserve_order {
+ new_repartition = new_repartition.with_preserve_order();
+ }
+ Ok(Arc::new(new_repartition) as Arc<dyn ExecutionPlan>)
+ })
+ }
+
fn repartitioned(
&self,
target_partitions: usize,
diff --git a/datafusion/physical-plan/src/sort_pushdown.rs
b/datafusion/physical-plan/src/sort_pushdown.rs
new file mode 100644
index 0000000000..8432fd5dab
--- /dev/null
+++ b/datafusion/physical-plan/src/sort_pushdown.rs
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort pushdown types for physical execution plans.
+//!
+//! This module provides types used for pushing sort ordering requirements
+//! down through the execution plan tree to data sources.
+
+/// Result of attempting to push down sort ordering to a node.
+///
+/// Used by [`ExecutionPlan::try_pushdown_sort`] to communicate
+/// whether and how sort ordering was successfully pushed down.
+///
+/// [`ExecutionPlan::try_pushdown_sort`]:
crate::ExecutionPlan::try_pushdown_sort
+#[derive(Debug, Clone)]
+pub enum SortOrderPushdownResult<T> {
+ /// The source can guarantee exact ordering (data is perfectly sorted).
+ ///
+ /// When this is returned, the optimizer can safely remove the Sort
operator
+ /// entirely since the data source guarantees the requested ordering.
+ Exact {
+ /// The optimized node that provides exact ordering
+ inner: T,
+ },
+ /// The source has optimized for the ordering but cannot guarantee perfect
sorting.
+ ///
+ /// This indicates the data source has been optimized (e.g., reordered
files/row groups
+ /// based on statistics, enabled reverse scanning) but the data may not be
perfectly
+ /// sorted. The optimizer should keep the Sort operator but benefits from
the
+ /// optimization (e.g., faster TopK queries due to early termination).
+ Inexact {
+ /// The optimized node that provides approximate ordering
+ inner: T,
+ },
+ /// The source cannot optimize for this ordering.
+ ///
+ /// The data source does not support the requested sort ordering and no
+ /// optimization was applied.
+ Unsupported,
+}
+
+impl<T> SortOrderPushdownResult<T> {
+ /// Extract the inner value if present
+ pub fn into_inner(self) -> Option<T> {
+ match self {
+ Self::Exact { inner } | Self::Inexact { inner } => Some(inner),
+ Self::Unsupported => None,
+ }
+ }
+
+ /// Map the inner value to a different type while preserving the variant.
+ pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> SortOrderPushdownResult<U>
{
+ match self {
+ Self::Exact { inner } => SortOrderPushdownResult::Exact { inner:
f(inner) },
+ Self::Inexact { inner } => {
+ SortOrderPushdownResult::Inexact { inner: f(inner) }
+ }
+ Self::Unsupported => SortOrderPushdownResult::Unsupported,
+ }
+ }
+
+ /// Try to map the inner value, returning an error if the function fails.
+ pub fn try_map<U, E, F: FnOnce(T) -> Result<U, E>>(
+ self,
+ f: F,
+ ) -> Result<SortOrderPushdownResult<U>, E> {
+ match self {
+ Self::Exact { inner } => {
+ Ok(SortOrderPushdownResult::Exact { inner: f(inner)? })
+ }
+ Self::Inexact { inner } => {
+ Ok(SortOrderPushdownResult::Inexact { inner: f(inner)? })
+ }
+ Self::Unsupported => Ok(SortOrderPushdownResult::Unsupported),
+ }
+ }
+
+ /// Convert this result to `Inexact`, downgrading `Exact` if present.
+ ///
+ /// This is useful when an operation (like merging multiple partitions)
+ /// cannot guarantee exact ordering even if the input provides it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use datafusion_physical_plan::SortOrderPushdownResult;
+ /// let exact = SortOrderPushdownResult::Exact { inner: 42 };
+ /// let inexact = exact.into_inexact();
+ /// assert!(matches!(inexact, SortOrderPushdownResult::Inexact { inner: 42
}));
+ ///
+ /// let already_inexact = SortOrderPushdownResult::Inexact { inner: 42 };
+ /// let still_inexact = already_inexact.into_inexact();
+ /// assert!(matches!(still_inexact, SortOrderPushdownResult::Inexact {
inner: 42 }));
+ ///
+ /// let unsupported = SortOrderPushdownResult::<i32>::Unsupported;
+ /// let still_unsupported = unsupported.into_inexact();
+ /// assert!(matches!(still_unsupported,
SortOrderPushdownResult::Unsupported));
+ /// ```
+ pub fn into_inexact(self) -> Self {
+ match self {
+ Self::Exact { inner } => Self::Inexact { inner },
+ Self::Inexact { inner } => Self::Inexact { inner },
+ Self::Unsupported => Self::Unsupported,
+ }
+ }
+}
diff --git a/datafusion/proto-common/src/generated/pbjson.rs
b/datafusion/proto-common/src/generated/pbjson.rs
index 7c08aaad98..0bf87203ac 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -1111,7 +1111,7 @@ impl serde::Serialize for ColumnStats {
struct_ser.serialize_field("distinctCount", v)?;
}
if let Some(v) = self.byte_size.as_ref() {
- struct_ser.serialize_field("ByteSize", v)?;
+ struct_ser.serialize_field("byteSize", v)?;
}
struct_ser.end()
}
@@ -1134,7 +1134,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
"distinct_count",
"distinctCount",
"byte_size",
- "ByteSize",
+ "byteSize",
];
#[allow(clippy::enum_variant_names)]
@@ -1144,7 +1144,6 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
SumValue,
NullCount,
DistinctCount,
-
ByteSize,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
@@ -1172,7 +1171,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
"sumValue" | "sum_value" =>
Ok(GeneratedField::SumValue),
"nullCount" | "null_count" =>
Ok(GeneratedField::NullCount),
"distinctCount" | "distinct_count" =>
Ok(GeneratedField::DistinctCount),
- "ByteSize" | "byte_size" =>
Ok(GeneratedField::ByteSize),
+ "byteSize" | "byte_size" =>
Ok(GeneratedField::ByteSize),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -1232,7 +1231,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
}
GeneratedField::ByteSize => {
if byte_size__.is_some() {
- return
Err(serde::de::Error::duplicate_field("ByteSize"));
+ return
Err(serde::de::Error::duplicate_field("byteSize"));
}
byte_size__ = map_.next_value()?;
}
diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt
b/datafusion/sqllogictest/test_files/create_external_table.slt
index 1e6183f48b..0b15a7f8ec 100644
--- a/datafusion/sqllogictest/test_files/create_external_table.slt
+++ b/datafusion/sqllogictest/test_files/create_external_table.slt
@@ -264,7 +264,7 @@ logical_plan
02)--TableScan: t projection=[id]
physical_plan
01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false]
-02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id], file_type=parquet, reverse_row_groups=true
statement ok
DROP TABLE t;
diff --git
a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
index 3e403171e0..3d08cdf751 100644
--- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
@@ -426,3 +426,329 @@ SET
datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
statement ok
SET datafusion.optimizer.enable_dynamic_filter_pushdown = true;
+
+# Test 6: Sort Pushdown for ordered Parquet files
+
+# Create a sorted dataset
+statement ok
+CREATE TABLE sorted_data(id INT, value INT, name VARCHAR) AS VALUES
+(1, 100, 'a'),
+(2, 200, 'b'),
+(3, 300, 'c'),
+(4, 400, 'd'),
+(5, 500, 'e'),
+(6, 600, 'f'),
+(7, 700, 'g'),
+(8, 800, 'h'),
+(9, 900, 'i'),
+(10, 1000, 'j');
+
+# Copy to parquet with sorting
+query I
+COPY (SELECT * FROM sorted_data ORDER BY id ASC)
+TO 'test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet';
+----
+10
+
+statement ok
+CREATE EXTERNAL TABLE sorted_parquet(id INT, value INT, name VARCHAR)
+STORED AS PARQUET
+LOCATION
'test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet'
+WITH ORDER (id ASC);
+
+# Test 6.1: Sort pushdown with DESC (opposite of ASC)
+# Should show reverse_row_groups=true
+query TT
+EXPLAIN SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3;
+----
+logical_plan
+01)Sort: sorted_parquet.id DESC NULLS FIRST, fetch=3
+02)--TableScan: sorted_parquet projection=[id, value, name]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]},
projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [
empty ], reverse_row_groups=true
+
+# Test 6.2: Verify results are correct
+query IIT
+SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3;
+----
+10 1000 j
+9 900 i
+8 800 h
+
+# Test 6.3: Should NOT apply for ASC (same direction)
+query TT
+EXPLAIN SELECT * FROM sorted_parquet ORDER BY id ASC LIMIT 3;
+----
+logical_plan
+01)Sort: sorted_parquet.id ASC NULLS LAST, fetch=3
+02)--TableScan: sorted_parquet projection=[id, value, name]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]},
projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST],
file_type=parquet
+
+# Test 6.4: Disable sort pushdown
+statement ok
+SET datafusion.optimizer.enable_sort_pushdown = false;
+
+query TT
+EXPLAIN SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3;
+----
+logical_plan
+01)Sort: sorted_parquet.id DESC NULLS FIRST, fetch=3
+02)--TableScan: sorted_parquet projection=[id, value, name]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]},
projection=[id, value, name], output_ordering=[id@0 ASC NULLS LAST],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Re-enable
+statement ok
+SET datafusion.optimizer.enable_sort_pushdown = true;
+
+# Test 6.5: With OFFSET
+query TT
+EXPLAIN SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3 OFFSET 2;
+----
+logical_plan
+01)Limit: skip=2, fetch=3
+02)--Sort: sorted_parquet.id DESC NULLS FIRST, fetch=5
+03)----TableScan: sorted_parquet projection=[id, value, name]
+physical_plan
+01)GlobalLimitExec: skip=2, fetch=3
+02)--SortExec: TopK(fetch=5), expr=[id@0 DESC], preserve_partitioning=[false]
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]},
projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [
empty ], reverse_row_groups=true
+
+query IIT
+SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3 OFFSET 2;
+----
+8 800 h
+7 700 g
+6 600 f
+
+# Test 6.6: Reverse scan with row selection (page index pruning)
+# This tests that when reverse_row_groups=true, the RowSelection is also
properly reversed
+
+# Create a dataset with multiple row groups and enable page index
+statement ok
+CREATE TABLE multi_rg_data(id INT, category VARCHAR, value INT) AS VALUES
+(1, 'alpha', 10),
+(2, 'alpha', 20),
+(3, 'beta', 30),
+(4, 'beta', 40),
+(5, 'gamma', 50),
+(6, 'gamma', 60),
+(7, 'delta', 70),
+(8, 'delta', 80);
+
+# Write with small row groups (2 rows each = 4 row groups)
+statement ok
+SET datafusion.execution.parquet.max_row_group_size = 2;
+
+query I
+COPY (SELECT * FROM multi_rg_data ORDER BY id ASC)
+TO 'test_files/scratch/dynamic_filter_pushdown_config/multi_rg_sorted.parquet';
+----
+8
+
+# Reset row group size
+statement ok
+SET datafusion.execution.parquet.max_row_group_size = 1048576;
+
+statement ok
+CREATE EXTERNAL TABLE multi_rg_sorted(id INT, category VARCHAR, value INT)
+STORED AS PARQUET
+LOCATION
'test_files/scratch/dynamic_filter_pushdown_config/multi_rg_sorted.parquet'
+WITH ORDER (id ASC);
+
+# Enable page index for better pruning
+statement ok
+SET datafusion.execution.parquet.enable_page_index = true;
+
+statement ok
+SET datafusion.execution.parquet.pushdown_filters = true;
+
+# Test with reverse scan and filter that prunes some row groups
+# This will create a RowSelection with partial row group scans
+query TT
+EXPLAIN SELECT * FROM multi_rg_sorted
+WHERE category IN ('alpha', 'gamma')
+ORDER BY id DESC LIMIT 5;
+----
+logical_plan
+01)Sort: multi_rg_sorted.id DESC NULLS FIRST, fetch=5
+02)--Filter: multi_rg_sorted.category = Utf8View("alpha") OR
multi_rg_sorted.category = Utf8View("gamma")
+03)----TableScan: multi_rg_sorted projection=[id, category, value],
partial_filters=[multi_rg_sorted.category = Utf8View("alpha") OR
multi_rg_sorted.category = Utf8View("gamma")]
+physical_plan
+01)SortExec: TopK(fetch=5), expr=[id@0 DESC], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/multi_rg_sorted.parquet]]},
projection=[id, category, value], file_type=parquet, predicate=(category@1 =
alpha OR category@1 = gamma) AND DynamicFilter [ empty ],
reverse_row_groups=true, pruning_predicate=category_null_count@2 != row_count@3
AND category_min@0 <= alpha AND alpha <= category_max@1 OR
category_null_count@2 != row_count@3 AND category_min@0 [...]
+
+# Verify the results are correct despite reverse scanning with row selection
+# Expected: gamma values (6, 5) then alpha values (2, 1), in DESC order by id
+query ITI
+SELECT * FROM multi_rg_sorted
+WHERE category IN ('alpha', 'gamma')
+ORDER BY id DESC LIMIT 5;
+----
+6 gamma 60
+5 gamma 50
+2 alpha 20
+1 alpha 10
+
+# Test with more complex selection pattern
+query ITI
+SELECT * FROM multi_rg_sorted
+WHERE category IN ('beta', 'delta')
+ORDER BY id DESC;
+----
+8 delta 80
+7 delta 70
+4 beta 40
+3 beta 30
+
+# Test forward scan for comparison (should give same logical results in ASC
order)
+query ITI
+SELECT * FROM multi_rg_sorted
+WHERE category IN ('alpha', 'gamma')
+ORDER BY id ASC;
+----
+1 alpha 10
+2 alpha 20
+5 gamma 50
+6 gamma 60
+
+# Disable reverse scan and verify it still works
+statement ok
+SET datafusion.optimizer.enable_sort_pushdown = false;
+
+query ITI
+SELECT * FROM multi_rg_sorted
+WHERE category IN ('alpha', 'gamma')
+ORDER BY id DESC LIMIT 5;
+----
+6 gamma 60
+5 gamma 50
+2 alpha 20
+1 alpha 10
+
+# Re-enable
+statement ok
+SET datafusion.optimizer.enable_sort_pushdown = true;
+
+# Test 6.7: Sort pushdown with more than one partition
+# Create multiple parquet files to trigger it
+
+# Split data into multiple files
+statement ok
+CREATE TABLE sorted_data_part1(id INT, value INT, name VARCHAR) AS VALUES
+(1, 100, 'a'),
+(2, 200, 'b'),
+(3, 300, 'c');
+
+statement ok
+CREATE TABLE sorted_data_part2(id INT, value INT, name VARCHAR) AS VALUES
+(4, 400, 'd'),
+(5, 500, 'e'),
+(6, 600, 'f');
+
+statement ok
+CREATE TABLE sorted_data_part3(id INT, value INT, name VARCHAR) AS VALUES
+(7, 700, 'g'),
+(8, 800, 'h'),
+(9, 900, 'i'),
+(10, 1000, 'j');
+
+# Create directory for multi-file parquet
+query I
+COPY (SELECT * FROM sorted_data_part1 ORDER BY id ASC)
+TO
'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part1.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM sorted_data_part2 ORDER BY id ASC)
+TO
'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part2.parquet';
+----
+3
+
+query I
+COPY (SELECT * FROM sorted_data_part3 ORDER BY id ASC)
+TO
'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part3.parquet';
+----
+4
+
+# Create external table pointing to directory with multiple files
+statement ok
+CREATE EXTERNAL TABLE sorted_parquet_multi(id INT, value INT, name VARCHAR)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/'
+WITH ORDER (id ASC);
+
+# Enable multiple partitions
+statement ok
+SET datafusion.execution.target_partitions = 4;
+
+# Now we should see RepartitionExec because we have 3 input partitions (3
files)
+query TT
+EXPLAIN SELECT * FROM sorted_parquet_multi ORDER BY id DESC LIMIT 3;
+----
+logical_plan
+01)Sort: sorted_parquet_multi.id DESC NULLS FIRST, fetch=3
+02)--TableScan: sorted_parquet_multi projection=[id, value, name]
+physical_plan
+01)SortPreservingMergeExec: [id@0 DESC], fetch=3
+02)--SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[true]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part2.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part3.parquet]]},
projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [
empty ], re [...]
+
+# Verify correctness with repartitioning and multiple files
+query IIT
+SELECT * FROM sorted_parquet_multi ORDER BY id DESC LIMIT 3;
+----
+10 1000 j
+9 900 i
+8 800 h
+
+# Test ASC order (should not trigger reverse scan)
+query IIT
+SELECT * FROM sorted_parquet_multi ORDER BY id ASC LIMIT 3;
+----
+1 100 a
+2 200 b
+3 300 c
+
+# Cleanup
+statement ok
+DROP TABLE sorted_data_part1;
+
+statement ok
+DROP TABLE sorted_data_part2;
+
+statement ok
+DROP TABLE sorted_data_part3;
+
+statement ok
+DROP TABLE sorted_parquet_multi;
+
+# Reset to default
+statement ok
+SET datafusion.execution.target_partitions = 4;
+
+# Cleanup
+statement ok
+DROP TABLE multi_rg_data;
+
+statement ok
+DROP TABLE multi_rg_sorted;
+
+statement ok
+SET datafusion.execution.parquet.enable_page_index = false;
+
+statement ok
+SET datafusion.execution.parquet.pushdown_filters = false;
+
+# Cleanup
+statement ok
+DROP TABLE sorted_data;
+
+statement ok
+DROP TABLE sorted_parquet;
+
+statement ok
+SET datafusion.optimizer.enable_sort_pushdown = true;
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 9f8e264ee8..9087aee56d 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -240,6 +240,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
+physical_plan after PushdownSort SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -320,6 +321,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:
ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]:
ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=E [...]
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
+physical_plan after PushdownSort SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -364,6 +366,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
+physical_plan after PushdownSort SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -599,6 +602,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
+physical_plan after PushdownSort SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt
b/datafusion/sqllogictest/test_files/information_schema.slt
index b5429d68c9..8c11667d97 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -297,6 +297,7 @@ datafusion.optimizer.enable_dynamic_filter_pushdown true
datafusion.optimizer.enable_join_dynamic_filter_pushdown true
datafusion.optimizer.enable_piecewise_merge_join false
datafusion.optimizer.enable_round_robin_repartition true
+datafusion.optimizer.enable_sort_pushdown true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.enable_topk_dynamic_filter_pushdown true
datafusion.optimizer.enable_window_limits true
@@ -430,6 +431,7 @@ datafusion.optimizer.enable_dynamic_filter_pushdown true
When set to true attemp
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to
true, the optimizer will attempt to push down Join dynamic filters into the
file scan phase.
datafusion.optimizer.enable_piecewise_merge_join false When set to true,
piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental.
Physical planner will opt for PiecewiseMergeJoin when there is only one range
filter.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the
physical plan optimizer will try to add round robin repartitioning to increase
parallelism to leverage more CPU cores
+datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown
optimization. When enabled, attempts to push sort requirements down to data
sources that can natively handle them (e.g., by reversing file/row group read
order). Returns **inexact ordering**: Sort operator is kept for correctness,
but optimized input enables early termination for TopK queries (ORDER BY ...
LIMIT N), providing significant speedup. Memory: No additional overhead (only
changes read order). Future: Will add [...]
datafusion.optimizer.enable_topk_aggregation true When set to true, the
optimizer will attempt to perform limit operations during aggregations, if
possible
datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to
true, the optimizer will attempt to push down TopK dynamic filters into the
file scan phase.
datafusion.optimizer.enable_window_limits true When set to true, the optimizer
will attempt to push limit operations past window functions, if possible
diff --git a/datafusion/sqllogictest/test_files/topk.slt
b/datafusion/sqllogictest/test_files/topk.slt
index 7364fccd8e..aba468d21f 100644
--- a/datafusion/sqllogictest/test_files/topk.slt
+++ b/datafusion/sqllogictest/test_files/topk.slt
@@ -340,7 +340,7 @@ explain select number, letter, age from partial_sorted
order by number asc limit
----
physical_plan
01)SortExec: TopK(fetch=3), expr=[number@0 ASC NULLS LAST],
preserve_partitioning=[false]
-02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC
NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
projection=[number, letter, age], file_type=parquet, predicate=DynamicFilter [
empty ], reverse_row_groups=true
query TT
explain select number, letter, age from partial_sorted order by letter asc,
number desc limit 3;
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 156df1d9d7..750543d0b5 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -161,6 +161,7 @@ The following configuration settings are available:
| datafusion.optimizer.default_filter_selectivity | 20
| The default filter selectivity used by Filter
Statistics when an exact selectivity cannot be determined. Valid values are
between 0 (no selectivity) and 100 (all rows are selected).
[...]
| datafusion.optimizer.prefer_existing_union |
false | When set to true, the optimizer will not attempt to
convert Union to Interleave
[...]
| datafusion.optimizer.expand_views_at_output |
false | When set to true, if the returned type is a view
type then the output will be coerced to a non-view. Coerces `Utf8View` to
`LargeUtf8`, and `BinaryView` to `LargeBinary`.
[...]
+| datafusion.optimizer.enable_sort_pushdown |
true | Enable sort pushdown optimization. When enabled,
attempts to push sort requirements down to data sources that can natively
handle them (e.g., by reversing file/row group read order). Returns **inexact
ordering**: Sort operator is kept for correctness, but optimized input enables
early termination for TopK queries (ORDER BY ... LIMIT N), providing
significant speedup. Memory: No additio [...]
| datafusion.explain.logical_plan_only |
false | When set to true, the explain statement will only
print logical plans
[...]
| datafusion.explain.physical_plan_only |
false | When set to true, the explain statement will only
print physical plans
[...]
| datafusion.explain.show_statistics |
false | When set to true, the explain statement will print
operator statistics for physical plans
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]