This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 06e5bbe250 Fix TopK Sort incorrectly pushed down past operators that 
do not accept limit pushdown (#16641)
06e5bbe250 is described below

commit 06e5bbe25016856064946a13a2f557ece6eb0e5a
Author: Qi Zhu <[email protected]>
AuthorDate: Fri Jul 4 00:30:27 2025 +0800

    Fix TopK Sort incorrectly pushed down past operators that do not accept 
limit pushdown (#16641)
    
    Co-authored-by: Adrian Garcia Badaracco 
<[email protected]>
---
 .../src/enforce_sorting/sort_pushdown.rs           | 36 ++++++++++++
 datafusion/sqllogictest/test_files/joins.slt       | 68 ++++++++++++++++++++++
 datafusion/sqllogictest/test_files/limit.slt       | 11 ++--
 datafusion/sqllogictest/test_files/order.slt       |  9 ++-
 datafusion/sqllogictest/test_files/topk.slt        |  2 +-
 datafusion/sqllogictest/test_files/union.slt       | 11 ++--
 6 files changed, 119 insertions(+), 18 deletions(-)

diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs 
b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
index a9c0e4cb28..6e4e784866 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
@@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{
     LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
     PhysicalSortRequirement,
 };
+use datafusion_physical_plan::execution_plan::CardinalityEffect;
 use datafusion_physical_plan::filter::FilterExec;
 use datafusion_physical_plan::joins::utils::{
     calculate_join_output_ordering, ColumnIndex,
@@ -190,6 +191,7 @@ fn pushdown_sorts_helper(
     } else if let Some(adjusted) = pushdown_requirement_to_children(
         &sort_push_down.plan,
         parent_requirement.clone(),
+        parent_fetch,
     )? {
         // For operators that can take a sort pushdown, continue with updated
         // requirements:
@@ -216,7 +218,41 @@ fn pushdown_sorts_helper(
 fn pushdown_requirement_to_children(
     plan: &Arc<dyn ExecutionPlan>,
     parent_required: OrderingRequirements,
+    parent_fetch: Option<usize>,
 ) -> Result<Option<Vec<Option<OrderingRequirements>>>> {
+    // If there is a limit on the parent plan we cannot push it down through 
operators that change the cardinality.
+    // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 
1/2 of the rows we'll end up with 1 row instead of 2.
+    // If the LIMIT is applied after the FilterExec and the FilterExec returns 
> 2 rows we'll end up with 2 rows (correct).
+    if parent_fetch.is_some() && !plan.supports_limit_pushdown() {
+        return Ok(None);
+    }
+    // Note: we still need to check the cardinality effect of the plan here, 
because the
+    // limit pushdown is not always safe, even if the plan supports it. Here's 
an example:
+    //
+    // UnionExec advertises `supports_limit_pushdown() == true` because it can
+    // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” 
separately
+    // on each branch before merging them together.
+    //
+    // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up
+    // all child row counts), so pushing a global TopK/LIMIT through it would
+    // break the semantics of “take the first k rows of the combined result.”
+    //
+    // For example, with two branches A and B and k = 3:
+    //   — Global LIMIT: take the first 3 rows from (A ∪ B) after merging.
+    //   — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows!
+    //
+    // That’s why we still block on cardinality: even though UnionExec can
+    // push a LIMIT to its children, its GreaterEqual effect means it cannot
+    // preserve the global TopK semantics.
+    if parent_fetch.is_some() {
+        match plan.cardinality_effect() {
+            CardinalityEffect::Equal => {
+                // safe: only true sources (e.g. CoalesceBatchesExec, 
ProjectionExec) pass
+            }
+            _ => return Ok(None),
+        }
+    }
+
     let maintains_input_order = plan.maintains_input_order();
     if is_window(plan) {
         let mut required_input_ordering = plan.required_input_ordering();
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 3be5c1b1c3..b4fec26772 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4790,3 +4790,71 @@ DROP TABLE compound_field_table_t;
 
 statement ok
 DROP TABLE compound_field_table_u;
+
+
+statement ok
+CREATE TABLE t1 (k INT, v INT);
+
+statement ok
+CREATE TABLE t2 (k INT, v INT);
+
+statement ok
+INSERT INTO t1
+  SELECT value AS k, value AS v
+  FROM range(1, 10001) AS t(value);
+
+statement ok
+INSERT INTO t2 VALUES (1, 1);
+
+## The TopK(Sort with fetch) should not be pushed down to the hash join
+query TT
+explain
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON t1.k = t2.k
+ORDER BY t1.k
+LIMIT 2;
+----
+logical_plan
+01)Sort: t1.k ASC NULLS LAST, fetch=2
+02)--LeftAnti Join: t1.k = t2.k
+03)----TableScan: t1 projection=[k, v]
+04)----TableScan: t2 projection=[k]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], 
preserve_partitioning=[false]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------DataSourceExec: partitions=1, partition_sizes=[3334]
+
+
+query II
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON t1.k = t2.k
+ORDER BY t1.k
+LIMIT 2;
+----
+2 2
+3 3
+
+
+## Test left anti join without limit, we should support push down sort to the 
left side
+query TT
+explain
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON t1.k = t2.k
+ORDER BY t1.k;
+----
+logical_plan
+01)Sort: t1.k ASC NULLS LAST
+02)--LeftAnti Join: t1.k = t2.k
+03)----TableScan: t1 projection=[k, v]
+04)----TableScan: t2 projection=[k]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false]
+05)------DataSourceExec: partitions=1, partition_sizes=[3334]
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index b46d15cb96..77850c6ae7 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -663,15 +663,14 @@ logical_plan
 physical_plan
 01)GlobalLimitExec: skip=4, fetch=10
 02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
-03)----UnionExec
-04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], 
preserve_partitioning=[true]
+03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
+04)------UnionExec
 05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true
-08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], 
preserve_partitioning=[true]
-09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
-10)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-11)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], 
file_type=csv, has_header=true
+08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
+09)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+10)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], 
file_type=csv, has_header=true
 
 # Applying LIMIT & OFFSET to subquery.
 query III
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index 3fc90a6459..e3bcfcdbda 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -1258,13 +1258,12 @@ logical_plan
 08)--------TableScan: ordered_table projection=[a0, b, c, d]
 physical_plan
 01)SortPreservingMergeExec: [d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC 
NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], fetch=2
-02)--UnionExec
-03)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, 
a@2 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, 
a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], 
preserve_partitioning=[true]
+03)----UnionExec
 04)------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 
as d]
 05)--------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
-06)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, 
a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false]
-07)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 
as d]
-08)--------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, 
c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
+06)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 
as d]
+07)--------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, 
c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
 
 # Test: run the query from above
 query IIIII
diff --git a/datafusion/sqllogictest/test_files/topk.slt 
b/datafusion/sqllogictest/test_files/topk.slt
index 9ff382d32a..afa78e43de 100644
--- a/datafusion/sqllogictest/test_files/topk.slt
+++ b/datafusion/sqllogictest/test_files/topk.slt
@@ -53,7 +53,7 @@ query I
 select * from (select * from topk limit 8) order by x limit 3;
 ----
 0
-1
+2
 2
 
 
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index f901a4d373..45b592ee4c 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -413,15 +413,14 @@ logical_plan
 06)------TableScan: aggregate_test_100 projection=[c1, c3]
 physical_plan
 01)SortPreservingMergeExec: [c9@1 DESC], fetch=5
-02)--UnionExec
-03)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
+02)--SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
+03)----UnionExec
 04)------ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Decimal128(20, 0)) as 
c9]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c9], file_type=csv, has_header=true
-07)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
-08)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as 
c9]
-09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-10)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c3], file_type=csv, has_header=true
+07)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as 
c9]
+08)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+09)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c3], file_type=csv, has_header=true
 
 query TR
 SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM 
aggregate_test_100 ORDER BY c9 DESC LIMIT 5


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to