This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 738812004a [branch-52] Fix incorrect `SortExec` removal before 
`AggregateExec` (#20247) (#20507)
738812004a is described below

commit 738812004aca72d38877303eb327943a156f0979
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Feb 25 17:30:35 2026 -0500

    [branch-52] Fix incorrect `SortExec` removal before `AggregateExec` 
(#20247) (#20507)
    
    ## Which issue does this PR close?
    - part of https://github.com/apache/datafusion/issues/20287
    - Fixes https://github.com/apache/datafusion/issues/20287 on branch-52
    ## Rationale for this change
    See issues
    
    ## What changes are included in this PR?
    
    - backports https://github.com/apache/datafusion/pull/20247
    
    ## Are these changes tested?
    
    By CI
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---------
    
    Co-authored-by: Mustafa Akur <[email protected]>
---
 .../src/enforce_sorting/sort_pushdown.rs           |  74 ++++++++
 .../sqllogictest/test_files/sort_pushdown.slt      | 210 +++++++++++++++++++++
 2 files changed, 284 insertions(+)

diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs 
b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
index 698fdea8e7..267faeda0c 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
@@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{
     LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
     PhysicalSortRequirement,
 };
+use datafusion_physical_plan::aggregates::AggregateExec;
 use datafusion_physical_plan::execution_plan::CardinalityEffect;
 use datafusion_physical_plan::filter::FilterExec;
 use datafusion_physical_plan::joins::utils::{
@@ -353,6 +354,8 @@ fn pushdown_requirement_to_children(
                 Ok(None)
             }
         }
+    } else if let Some(aggregate_exec) = 
plan.as_any().downcast_ref::<AggregateExec>() {
+        handle_aggregate_pushdown(aggregate_exec, parent_required)
     } else if maintains_input_order.is_empty()
         || !maintains_input_order.iter().any(|o| *o)
         || plan.as_any().is::<RepartitionExec>()
@@ -388,6 +391,77 @@ fn pushdown_requirement_to_children(
     // TODO: Add support for Projection push down
 }
 
+/// Try to push sorting through  [`AggregateExec`]
+///
+/// `AggregateExec` only preserves the input order of its group by columns
+/// (not aggregates in general, which are formed from arbitrary expressions 
over
+/// input)
+///
+/// Thus function rewrites the parent required ordering in terms of the
+/// aggregate input if possible. This rewritten requirement represents the
+/// ordering of the `AggregateExec`'s **input** that would also satisfy the
+/// **parent** ordering.
+///
+/// If no such mapping is possible (e.g. because the sort references aggregate
+/// columns), returns None.
+fn handle_aggregate_pushdown(
+    aggregate_exec: &AggregateExec,
+    parent_required: OrderingRequirements,
+) -> Result<Option<Vec<Option<OrderingRequirements>>>> {
+    if !aggregate_exec
+        .maintains_input_order()
+        .into_iter()
+        .any(|o| o)
+    {
+        return Ok(None);
+    }
+
+    let group_expr = aggregate_exec.group_expr();
+    // GROUPING SETS introduce additional output columns and NULL 
substitutions;
+    // skip pushdown until we can map those cases safely.
+    if group_expr.has_grouping_set() {
+        return Ok(None);
+    }
+
+    let group_input_exprs = group_expr.input_exprs();
+    let parent_requirement = parent_required.into_single();
+    let mut child_requirement = Vec::with_capacity(parent_requirement.len());
+
+    for req in parent_requirement {
+        // Sort above AggregateExec should reference its output columns. Map 
each
+        // output group-by column to its original input expression.
+        let Some(column) = req.expr.as_any().downcast_ref::<Column>() else {
+            return Ok(None);
+        };
+        if column.index() >= group_input_exprs.len() {
+            // AggregateExec does not produce output that is sorted on 
aggregate
+            // columns so those can not be pushed through.
+            return Ok(None);
+        }
+        child_requirement.push(PhysicalSortRequirement::new(
+            Arc::clone(&group_input_exprs[column.index()]),
+            req.options,
+        ));
+    }
+
+    let Some(child_requirement) = LexRequirement::new(child_requirement) else {
+        return Ok(None);
+    };
+
+    // Keep sort above aggregate unless input ordering already satisfies the
+    // mapped requirement.
+    if aggregate_exec
+        .input()
+        .equivalence_properties()
+        .ordering_satisfy_requirement(child_requirement.iter().cloned())?
+    {
+        let child_requirements = OrderingRequirements::new(child_requirement);
+        Ok(Some(vec![Some(child_requirements)]))
+    } else {
+        Ok(None)
+    }
+}
+
 /// Return true if pushing the sort requirements through a node would violate
 /// the input sorting requirements for the plan
 fn pushdown_would_violate_requirements(
diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt 
b/datafusion/sqllogictest/test_files/sort_pushdown.slt
index 7b741579cb..99f26b66d4 100644
--- a/datafusion/sqllogictest/test_files/sort_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt
@@ -851,6 +851,210 @@ LIMIT 3;
 5 4
 2 -3
 
+# Test 3.7: Aggregate ORDER BY expression should keep SortExec
+# Source pattern declared on parquet scan: [x ASC, y ASC].
+# Requested pattern in ORDER BY: [x ASC, CAST(y AS BIGINT) % 2 ASC].
+# Example for x=1 input y order 1,2,3 gives bucket order 1,0,1, which does not
+# match requested bucket ASC order. SortExec is required above AggregateExec.
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+statement ok
+CREATE TABLE agg_expr_data(x INT, y INT, v INT) AS VALUES
+(1, 1, 10),
+(1, 2, 20),
+(1, 3, 30),
+(2, 1, 40),
+(2, 2, 50),
+(2, 3, 60);
+
+query I
+COPY (SELECT * FROM agg_expr_data ORDER BY x, y)
+TO 'test_files/scratch/sort_pushdown/agg_expr_sorted.parquet';
+----
+6
+
+statement ok
+CREATE EXTERNAL TABLE agg_expr_parquet(x INT, y INT, v INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/agg_expr_sorted.parquet'
+WITH ORDER (x ASC, y ASC);
+
+query TT
+EXPLAIN SELECT
+  x,
+  CAST(y AS BIGINT) % 2,
+  SUM(v)
+FROM agg_expr_parquet
+GROUP BY x, CAST(y AS BIGINT) % 2
+ORDER BY x, CAST(y AS BIGINT) % 2;
+----
+logical_plan
+01)Sort: agg_expr_parquet.x ASC NULLS LAST, agg_expr_parquet.y % Int64(2) ASC 
NULLS LAST
+02)--Aggregate: groupBy=[[agg_expr_parquet.x, CAST(agg_expr_parquet.y AS 
Int64) % Int64(2)]], aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]]
+03)----TableScan: agg_expr_parquet projection=[x, y, v]
+physical_plan
+01)SortExec: expr=[x@0 ASC NULLS LAST, agg_expr_parquet.y % Int64(2)@1 ASC 
NULLS LAST], preserve_partitioning=[false]
+02)--AggregateExec: mode=Single, gby=[x@0 as x, CAST(y@1 AS Int64) % 2 as 
agg_expr_parquet.y % Int64(2)], aggr=[sum(agg_expr_parquet.v)], 
ordering_mode=PartiallySorted([0])
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]},
 projection=[x, y, v], output_ordering=[x@0 ASC NULLS LAST, y@1 ASC NULLS 
LAST], file_type=parquet
+
+# Expected output pattern from ORDER BY [x, bucket]:
+# rows grouped by x, and within each x bucket appears as 0 then 1.
+query III
+SELECT
+  x,
+  CAST(y AS BIGINT) % 2,
+  SUM(v)
+FROM agg_expr_parquet
+GROUP BY x, CAST(y AS BIGINT) % 2
+ORDER BY x, CAST(y AS BIGINT) % 2;
+----
+1 0 20
+1 1 40
+2 0 50
+2 1 100
+
+# Test 3.8: Aggregate ORDER BY monotonic expression can push down (no SortExec)
+query TT
+EXPLAIN SELECT
+  x,
+  CAST(y AS BIGINT),
+  SUM(v)
+FROM agg_expr_parquet
+GROUP BY x, CAST(y AS BIGINT)
+ORDER BY x, CAST(y AS BIGINT);
+----
+logical_plan
+01)Sort: agg_expr_parquet.x ASC NULLS LAST, agg_expr_parquet.y ASC NULLS LAST
+02)--Aggregate: groupBy=[[agg_expr_parquet.x, CAST(agg_expr_parquet.y AS 
Int64)]], aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]]
+03)----TableScan: agg_expr_parquet projection=[x, y, v]
+physical_plan
+01)AggregateExec: mode=Single, gby=[x@0 as x, CAST(y@1 AS Int64) as 
agg_expr_parquet.y], aggr=[sum(agg_expr_parquet.v)], ordering_mode=Sorted
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]},
 projection=[x, y, v], output_ordering=[x@0 ASC NULLS LAST, y@1 ASC NULLS 
LAST], file_type=parquet
+
+query III
+SELECT
+  x,
+  CAST(y AS BIGINT),
+  SUM(v)
+FROM agg_expr_parquet
+GROUP BY x, CAST(y AS BIGINT)
+ORDER BY x, CAST(y AS BIGINT);
+----
+1 1 10
+1 2 20
+1 3 30
+2 1 40
+2 2 50
+2 3 60
+
+# Test 3.9: Aggregate ORDER BY aggregate output should keep SortExec
+query TT
+EXPLAIN SELECT x, SUM(v)
+FROM agg_expr_parquet
+GROUP BY x
+ORDER BY SUM(v);
+----
+logical_plan
+01)Sort: sum(agg_expr_parquet.v) ASC NULLS LAST
+02)--Aggregate: groupBy=[[agg_expr_parquet.x]], 
aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]]
+03)----TableScan: agg_expr_parquet projection=[x, v]
+physical_plan
+01)SortExec: expr=[sum(agg_expr_parquet.v)@1 ASC NULLS LAST], 
preserve_partitioning=[false]
+02)--AggregateExec: mode=Single, gby=[x@0 as x], 
aggr=[sum(agg_expr_parquet.v)], ordering_mode=Sorted
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]},
 projection=[x, v], output_ordering=[x@0 ASC NULLS LAST], file_type=parquet
+
+query II
+SELECT x, SUM(v)
+FROM agg_expr_parquet
+GROUP BY x
+ORDER BY SUM(v);
+----
+1 60
+2 150
+
+# Test 3.10: Aggregate with non-preserved input order should keep SortExec
+# v is not part of the order by
+query TT
+EXPLAIN SELECT v, SUM(y)
+FROM agg_expr_parquet
+GROUP BY v
+ORDER BY v;
+----
+logical_plan
+01)Sort: agg_expr_parquet.v ASC NULLS LAST
+02)--Aggregate: groupBy=[[agg_expr_parquet.v]], 
aggr=[[sum(CAST(agg_expr_parquet.y AS Int64))]]
+03)----TableScan: agg_expr_parquet projection=[y, v]
+physical_plan
+01)SortExec: expr=[v@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--AggregateExec: mode=Single, gby=[v@1 as v], aggr=[sum(agg_expr_parquet.y)]
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]},
 projection=[y, v], file_type=parquet
+
+query II
+SELECT v, SUM(y)
+FROM agg_expr_parquet
+GROUP BY v
+ORDER BY v;
+----
+10 1
+20 2
+30 3
+40 1
+50 2
+60 3
+
+# Test 3.11: Aggregate ORDER BY non-column expression (unsatisfied) keeps 
SortExec
+# (though note in theory DataFusion could figure out that data sorted by x 
will also be sorted by x+1)
+query TT
+EXPLAIN SELECT x, SUM(v)
+FROM agg_expr_parquet
+GROUP BY x
+ORDER BY x + 1 DESC;
+----
+logical_plan
+01)Sort: CAST(agg_expr_parquet.x AS Int64) + Int64(1) DESC NULLS FIRST
+02)--Aggregate: groupBy=[[agg_expr_parquet.x]], 
aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]]
+03)----TableScan: agg_expr_parquet projection=[x, v]
+physical_plan
+01)SortExec: expr=[CAST(x@0 AS Int64) + 1 DESC], preserve_partitioning=[false]
+02)--AggregateExec: mode=Single, gby=[x@0 as x], 
aggr=[sum(agg_expr_parquet.v)], ordering_mode=Sorted
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]},
 projection=[x, v], output_ordering=[x@0 ASC NULLS LAST], file_type=parquet
+
+query II
+SELECT x, SUM(v)
+FROM agg_expr_parquet
+GROUP BY x
+ORDER BY x + 1 DESC;
+----
+2 150
+1 60
+
+# Test 3.12: Aggregate ORDER BY non-column expression (unsatisfied) keeps 
SortExec
+# (though note in theory DataFusion could figure out that data sorted by x 
will also be sorted by x+1)
+query TT
+EXPLAIN SELECT x, SUM(v)
+FROM agg_expr_parquet
+GROUP BY x
+ORDER BY 2 * x ASC;
+----
+logical_plan
+01)Sort: Int64(2) * CAST(agg_expr_parquet.x AS Int64) ASC NULLS LAST
+02)--Aggregate: groupBy=[[agg_expr_parquet.x]], 
aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]]
+03)----TableScan: agg_expr_parquet projection=[x, v]
+physical_plan
+01)SortExec: expr=[2 * CAST(x@0 AS Int64) ASC NULLS LAST], 
preserve_partitioning=[false]
+02)--AggregateExec: mode=Single, gby=[x@0 as x], 
aggr=[sum(agg_expr_parquet.v)], ordering_mode=Sorted
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]},
 projection=[x, v], output_ordering=[x@0 ASC NULLS LAST], file_type=parquet
+
+query II
+SELECT x, SUM(v)
+FROM agg_expr_parquet
+GROUP BY x
+ORDER BY 2 * x ASC;
+----
+1 60
+2 150
+
 # Test 4: Reversed filesystem order with inferred ordering
 # Create 3 parquet files with non-overlapping id ranges, named so filesystem
 # order is OPPOSITE to data order. Each file is internally sorted by id ASC.
@@ -1420,5 +1624,11 @@ DROP TABLE signed_data;
 statement ok
 DROP TABLE signed_parquet;
 
+statement ok
+DROP TABLE agg_expr_data;
+
+statement ok
+DROP TABLE agg_expr_parquet;
+
 statement ok
 SET datafusion.optimizer.enable_sort_pushdown = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to