This is an automated email from the ASF dual-hosted git repository.
adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 06e5bbe250 Fix TopK Sort incorrectly pushed down past operators that
do not accept limit pushdown (#16641)
06e5bbe250 is described below
commit 06e5bbe25016856064946a13a2f557ece6eb0e5a
Author: Qi Zhu <[email protected]>
AuthorDate: Fri Jul 4 00:30:27 2025 +0800
Fix TopK Sort incorrectly pushed down past operators that do not accept
limit pushdown (#16641)
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
---
.../src/enforce_sorting/sort_pushdown.rs | 36 ++++++++++++
datafusion/sqllogictest/test_files/joins.slt | 68 ++++++++++++++++++++++
datafusion/sqllogictest/test_files/limit.slt | 11 ++--
datafusion/sqllogictest/test_files/order.slt | 9 ++-
datafusion/sqllogictest/test_files/topk.slt | 2 +-
datafusion/sqllogictest/test_files/union.slt | 11 ++--
6 files changed, 119 insertions(+), 18 deletions(-)
diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
index a9c0e4cb28..6e4e784866 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
@@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
PhysicalSortRequirement,
};
+use datafusion_physical_plan::execution_plan::CardinalityEffect;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::utils::{
calculate_join_output_ordering, ColumnIndex,
@@ -190,6 +191,7 @@ fn pushdown_sorts_helper(
} else if let Some(adjusted) = pushdown_requirement_to_children(
&sort_push_down.plan,
parent_requirement.clone(),
+ parent_fetch,
)? {
// For operators that can take a sort pushdown, continue with updated
// requirements:
@@ -216,7 +218,41 @@ fn pushdown_sorts_helper(
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: OrderingRequirements,
+ parent_fetch: Option<usize>,
) -> Result<Option<Vec<Option<OrderingRequirements>>>> {
+ // If there is a limit on the parent plan we cannot push it down through
operators that change the cardinality.
+ // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out
1/2 of the rows we'll end up with 1 row instead of 2.
+ // If the LIMIT is applied after the FilterExec and the FilterExec returns
> 2 rows we'll end up with 2 rows (correct).
+ if parent_fetch.is_some() && !plan.supports_limit_pushdown() {
+ return Ok(None);
+ }
+ // Note: we still need to check the cardinality effect of the plan here,
because the
+ // limit pushdown is not always safe, even if the plan supports it. Here's
an example:
+ //
+ // UnionExec advertises `supports_limit_pushdown() == true` because it can
+ // forward a LIMIT k to each of its children—i.e. apply “LIMIT k”
separately
+ // on each branch before merging them together.
+ //
+ // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up
+ // all child row counts), so pushing a global TopK/LIMIT through it would
+ // break the semantics of “take the first k rows of the combined result.”
+ //
+ // For example, with two branches A and B and k = 3:
+ // — Global LIMIT: take the first 3 rows from (A ∪ B) after merging.
+ // — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows!
+ //
+ // That’s why we still block on cardinality: even though UnionExec can
+ // push a LIMIT to its children, its GreaterEqual effect means it cannot
+ // preserve the global TopK semantics.
+ if parent_fetch.is_some() {
+ match plan.cardinality_effect() {
+ CardinalityEffect::Equal => {
+ // safe: only true sources (e.g. CoalesceBatchesExec,
ProjectionExec) pass
+ }
+ _ => return Ok(None),
+ }
+ }
+
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let mut required_input_ordering = plan.required_input_ordering();
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 3be5c1b1c3..b4fec26772 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4790,3 +4790,71 @@ DROP TABLE compound_field_table_t;
statement ok
DROP TABLE compound_field_table_u;
+
+
+statement ok
+CREATE TABLE t1 (k INT, v INT);
+
+statement ok
+CREATE TABLE t2 (k INT, v INT);
+
+statement ok
+INSERT INTO t1
+ SELECT value AS k, value AS v
+ FROM range(1, 10001) AS t(value);
+
+statement ok
+INSERT INTO t2 VALUES (1, 1);
+
+## The TopK(Sort with fetch) should not be pushed down to the hash join
+query TT
+explain
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON t1.k = t2.k
+ORDER BY t1.k
+LIMIT 2;
+----
+logical_plan
+01)Sort: t1.k ASC NULLS LAST, fetch=2
+02)--LeftAnti Join: t1.k = t2.k
+03)----TableScan: t1 projection=[k, v]
+04)----TableScan: t2 projection=[k]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--CoalesceBatchesExec: target_batch_size=3
+03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------DataSourceExec: partitions=1, partition_sizes=[3334]
+
+
+query II
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON t1.k = t2.k
+ORDER BY t1.k
+LIMIT 2;
+----
+2 2
+3 3
+
+
+## Test left anti join without limit, we should support push down sort to the
left side
+query TT
+explain
+SELECT *
+FROM t1
+LEFT ANTI JOIN t2 ON t1.k = t2.k
+ORDER BY t1.k;
+----
+logical_plan
+01)Sort: t1.k ASC NULLS LAST
+02)--LeftAnti Join: t1.k = t2.k
+03)----TableScan: t1 projection=[k, v]
+04)----TableScan: t2 projection=[k]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3
+02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false]
+05)------DataSourceExec: partitions=1, partition_sizes=[3334]
diff --git a/datafusion/sqllogictest/test_files/limit.slt
b/datafusion/sqllogictest/test_files/limit.slt
index b46d15cb96..77850c6ae7 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -663,15 +663,14 @@ logical_plan
physical_plan
01)GlobalLimitExec: skip=4, fetch=10
02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
-03)----UnionExec
-04)------SortExec: TopK(fetch=14), expr=[c@0 DESC],
preserve_partitioning=[true]
+03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
+04)------UnionExec
05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true
-08)------SortExec: TopK(fetch=14), expr=[c@0 DESC],
preserve_partitioning=[true]
-09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
-10)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-11)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d],
file_type=csv, has_header=true
+08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
+09)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+10)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d],
file_type=csv, has_header=true
# Applying LIMIT & OFFSET to subquery.
query III
diff --git a/datafusion/sqllogictest/test_files/order.slt
b/datafusion/sqllogictest/test_files/order.slt
index 3fc90a6459..e3bcfcdbda 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -1258,13 +1258,12 @@ logical_plan
08)--------TableScan: ordered_table projection=[a0, b, c, d]
physical_plan
01)SortPreservingMergeExec: [d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC
NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], fetch=2
-02)--UnionExec
-03)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST,
a@2 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST,
a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST],
preserve_partitioning=[true]
+03)----UnionExec
04)------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3
as d]
05)--------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
-06)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST,
a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false]
-07)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3
as d]
-08)--------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b,
c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
+06)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3
as d]
+07)--------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b,
c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
# Test: run the query from above
query IIIII
diff --git a/datafusion/sqllogictest/test_files/topk.slt
b/datafusion/sqllogictest/test_files/topk.slt
index 9ff382d32a..afa78e43de 100644
--- a/datafusion/sqllogictest/test_files/topk.slt
+++ b/datafusion/sqllogictest/test_files/topk.slt
@@ -53,7 +53,7 @@ query I
select * from (select * from topk limit 8) order by x limit 3;
----
0
-1
+2
2
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index f901a4d373..45b592ee4c 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -413,15 +413,14 @@ logical_plan
06)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)SortPreservingMergeExec: [c9@1 DESC], fetch=5
-02)--UnionExec
-03)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
+02)--SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
+03)----UnionExec
04)------ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Decimal128(20, 0)) as
c9]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c9], file_type=csv, has_header=true
-07)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
-08)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as
c9]
-09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-10)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c3], file_type=csv, has_header=true
+07)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as
c9]
+08)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+09)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c3], file_type=csv, has_header=true
query TR
SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM
aggregate_test_100 ORDER BY c9 DESC LIMIT 5
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]