This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new 2c3a360f1a [branch-53] Fix push_down_filter for children with
non-empty fetch fields (#21057) (#21142)
2c3a360f1a is described below
commit 2c3a360f1afe84ae4f8349ddcc2497d4ccd63685
Author: Haresh Khanna <[email protected]>
AuthorDate: Wed Mar 25 21:13:34 2026 +0000
[branch-53] Fix push_down_filter for children with non-empty fetch fields
(#21057) (#21142)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Related to https://github.com/apache/datafusion/issues/21063
- Related to https://github.com/apache/datafusion/issues/21079
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
Currently if we see a filter with a limit underneath, we don't push the
filter past the limit. However, sort nodes and table scan nodes can have
fetch fields which do essentially the same thing, and we don't stop
filters being pushed past them. This is a correctness bug that can lead
to undefined behaviour.
I added checks for exactly this condition so we don't push the filter
down. I think the prior expectation was that there would be a limit node
between any of these nodes, but this is also not true. In
`push_down_limit.rs`, there's code that does this optimisation when a
limit has a sort under it:
```rust
LogicalPlan::Sort(mut sort) => {
let new_fetch = {
let sort_fetch = skip + fetch;
Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
};
if new_fetch == sort.fetch {
if skip > 0 {
original_limit(skip, fetch, LogicalPlan::Sort(sort))
} else {
Ok(Transformed::yes(LogicalPlan::Sort(sort)))
}
} else {
sort.fetch = new_fetch;
limit.input = Arc::new(LogicalPlan::Sort(sort));
Ok(Transformed::yes(LogicalPlan::Limit(limit)))
}
}
```
The first time this runs, it sets the internal fetch of the sort to
new_fetch, and on the second optimisation pass it hits the branch where
we just get rid of the limit node altogether, leaving the sort node
exposed to potential filters which can now push down into it.
There is also a related fix in `gather_filters_for_pushdown` in
`SortExec`, which does the same thing for physical plan nodes. If we see
that a given execution plan has non-empty fetch, it should not allow any
parent filters to be pushed down.
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Added checks in the optimisation rule to avoid pushing filters past
children with built-in limits.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes:
- Unit tests in `push_down_filter.rs`
- Fixed an existing test in `window.slt`
- Unit tests for the physical plan change in `sort.rs`
- New slt test in `push_down_filter_sort_fetch.slt` for this exact
behaviour
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
No
Co-authored-by: Shiv Bhatia <[email protected]>
Co-authored-by: Shiv Bhatia <[email protected]>
---
datafusion/expr/src/logical_plan/plan.rs | 76 +++++++++++++++++++++++++
datafusion/optimizer/src/push_down_filter.rs | 66 +++++++++++++++++++++
datafusion/physical-plan/src/sorts/sort.rs | 82 ++++++++++++++++++++++++++-
datafusion/sqllogictest/test_files/limit.slt | 39 +++++++++++++
datafusion/sqllogictest/test_files/window.slt | 15 ++---
5 files changed, 269 insertions(+), 9 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 1c901f6d4a..99688a52a7 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1392,6 +1392,82 @@ impl LogicalPlan {
}
}
+ /// Returns the skip (offset) of this plan node, if it has one.
+ ///
+ /// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
+ /// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
+ pub fn skip(&self) -> Result<Option<usize>> {
+ match self {
+ LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
+ SkipType::Literal(0) => Ok(None),
+ SkipType::Literal(n) => Ok(Some(n)),
+ SkipType::UnsupportedExpr => Ok(None),
+ },
+ LogicalPlan::Sort(_) => Ok(None),
+ LogicalPlan::TableScan(_) => Ok(None),
+ LogicalPlan::Projection(_) => Ok(None),
+ LogicalPlan::Filter(_) => Ok(None),
+ LogicalPlan::Window(_) => Ok(None),
+ LogicalPlan::Aggregate(_) => Ok(None),
+ LogicalPlan::Join(_) => Ok(None),
+ LogicalPlan::Repartition(_) => Ok(None),
+ LogicalPlan::Union(_) => Ok(None),
+ LogicalPlan::EmptyRelation(_) => Ok(None),
+ LogicalPlan::Subquery(_) => Ok(None),
+ LogicalPlan::SubqueryAlias(_) => Ok(None),
+ LogicalPlan::Statement(_) => Ok(None),
+ LogicalPlan::Values(_) => Ok(None),
+ LogicalPlan::Explain(_) => Ok(None),
+ LogicalPlan::Analyze(_) => Ok(None),
+ LogicalPlan::Extension(_) => Ok(None),
+ LogicalPlan::Distinct(_) => Ok(None),
+ LogicalPlan::Dml(_) => Ok(None),
+ LogicalPlan::Ddl(_) => Ok(None),
+ LogicalPlan::Copy(_) => Ok(None),
+ LogicalPlan::DescribeTable(_) => Ok(None),
+ LogicalPlan::Unnest(_) => Ok(None),
+ LogicalPlan::RecursiveQuery(_) => Ok(None),
+ }
+ }
+
+ /// Returns the fetch (limit) of this plan node, if it has one.
+ ///
+ /// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
+ /// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
+ /// return `Ok(None)`.
+ pub fn fetch(&self) -> Result<Option<usize>> {
+ match self {
+ LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
+ LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
+ LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
+ FetchType::Literal(s) => Ok(s),
+ FetchType::UnsupportedExpr => Ok(None),
+ },
+ LogicalPlan::Projection(_) => Ok(None),
+ LogicalPlan::Filter(_) => Ok(None),
+ LogicalPlan::Window(_) => Ok(None),
+ LogicalPlan::Aggregate(_) => Ok(None),
+ LogicalPlan::Join(_) => Ok(None),
+ LogicalPlan::Repartition(_) => Ok(None),
+ LogicalPlan::Union(_) => Ok(None),
+ LogicalPlan::EmptyRelation(_) => Ok(None),
+ LogicalPlan::Subquery(_) => Ok(None),
+ LogicalPlan::SubqueryAlias(_) => Ok(None),
+ LogicalPlan::Statement(_) => Ok(None),
+ LogicalPlan::Values(_) => Ok(None),
+ LogicalPlan::Explain(_) => Ok(None),
+ LogicalPlan::Analyze(_) => Ok(None),
+ LogicalPlan::Extension(_) => Ok(None),
+ LogicalPlan::Distinct(_) => Ok(None),
+ LogicalPlan::Dml(_) => Ok(None),
+ LogicalPlan::Ddl(_) => Ok(None),
+ LogicalPlan::Copy(_) => Ok(None),
+ LogicalPlan::DescribeTable(_) => Ok(None),
+ LogicalPlan::Unnest(_) => Ok(None),
+ LogicalPlan::RecursiveQuery(_) => Ok(None),
+ }
+ }
+
/// If this node's expressions contains any references to an outer subquery
pub fn contains_outer_reference(&self) -> bool {
let mut contains = false;
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index b1c0960386..d9cbe7cea4 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -793,6 +793,13 @@ impl OptimizerRule for PushDownFilter {
filter.predicate = new_predicate;
}
+ // If the child has a fetch (limit) or skip (offset), pushing a filter
+ // below it would change semantics: the limit/offset should apply
before
+ // the filter, not after.
+ if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() {
+ return Ok(Transformed::no(LogicalPlan::Filter(filter)));
+ }
+
match Arc::unwrap_or_clone(filter.input) {
LogicalPlan::Filter(child_filter) => {
let parents_predicates =
split_conjunction_owned(filter.predicate);
@@ -4296,4 +4303,63 @@ mod tests {
"
)
}
+
+ #[test]
+ fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
+ let scan = test_table_scan()?;
+ let scan_with_fetch = match scan {
+ LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
+ fetch: Some(10),
+ ..scan
+ }),
+ _ => unreachable!(),
+ };
+ let plan = LogicalPlanBuilder::from(scan_with_fetch)
+ .filter(col("a").gt(lit(10i64)))?
+ .build()?;
+ // Filter must NOT be pushed into the table scan when it has a fetch
(limit)
+ assert_optimized_plan_equal!(
+ plan,
+ @r"
+ Filter: test.a > Int64(10)
+ TableScan: test, fetch=10
+ "
+ )
+ }
+
+ #[test]
+ fn filter_push_down_through_sort_without_fetch() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .sort(vec![col("a").sort(true, true)])?
+ .filter(col("a").gt(lit(10i64)))?
+ .build()?;
+ // Filter should be pushed below the sort
+ assert_optimized_plan_equal!(
+ plan,
+ @r"
+ Sort: test.a ASC NULLS FIRST
+ TableScan: test, full_filters=[test.a > Int64(10)]
+ "
+ )
+ }
+
+ #[test]
+ fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
+ .filter(col("a").gt(lit(10i64)))?
+ .build()?;
+ // Filter must NOT be pushed below the sort when it has a fetch
(limit),
+ // because the limit should apply before the filter.
+ assert_optimized_plan_equal!(
+ plan,
+ @r"
+ Filter: test.a > Int64(10)
+ Sort: test.a ASC NULLS FIRST, fetch=5
+ TableScan: test
+ "
+ )
+ }
}
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index b1b44cb102..ae881dcd4b 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -1374,11 +1374,22 @@ impl ExecutionPlan for SortExec {
config: &datafusion_common::config::ConfigOptions,
) -> Result<FilterDescription> {
if phase != FilterPushdownPhase::Post {
+ if self.fetch.is_some() {
+ return Ok(FilterDescription::all_unsupported(
+ &parent_filters,
+ &self.children(),
+ ));
+ }
return FilterDescription::from_children(parent_filters,
&self.children());
}
- let mut child =
- ChildFilterDescription::from_child(&parent_filters, self.input())?;
+ // In Post phase: block parent filters when fetch is set,
+ // but still push the TopK dynamic filter (self-filter).
+ let mut child = if self.fetch.is_some() {
+ ChildFilterDescription::all_unsupported(&parent_filters)
+ } else {
+ ChildFilterDescription::from_child(&parent_filters, self.input())?
+ };
if let Some(filter) = &self.filter
&& config.optimizer.enable_topk_dynamic_filter_pushdown
@@ -1399,8 +1410,10 @@ mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::collect;
+ use crate::empty::EmptyExec;
use crate::execution_plan::Boundedness;
use crate::expressions::col;
+ use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
use crate::test;
use crate::test::TestMemoryExec;
use crate::test::exec::{BlockingExec,
assert_strong_count_converges_to_zero};
@@ -1410,6 +1423,7 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
+ use datafusion_common::config::ConfigOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::RecordBatchStream;
@@ -2691,4 +2705,68 @@ mod tests {
Ok(())
}
+
+ fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let input = Arc::new(EmptyExec::new(schema));
+ SortExec::new(
+ [PhysicalSortExpr::new_default(Arc::new(Column::new("a",
0)))].into(),
+ input,
+ )
+ .with_fetch(fetch)
+ }
+
+ #[test]
+ fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
+ let sort = make_sort_exec_with_fetch(Some(10));
+ let desc = sort.gather_filters_for_pushdown(
+ FilterPushdownPhase::Pre,
+ vec![Arc::new(Column::new("a", 0))],
+ &ConfigOptions::new(),
+ )?;
+ // Sort with fetch (TopK) must not allow filters to be pushed below it.
+ assert!(matches!(
+ desc.parent_filters()[0][0].discriminant,
+ PushedDown::No
+ ));
+ Ok(())
+ }
+
+ #[test]
+ fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
+ let sort = make_sort_exec_with_fetch(None);
+ let desc = sort.gather_filters_for_pushdown(
+ FilterPushdownPhase::Pre,
+ vec![Arc::new(Column::new("a", 0))],
+ &ConfigOptions::new(),
+ )?;
+ // Plain sort (no fetch) is filter-commutative.
+ assert!(matches!(
+ desc.parent_filters()[0][0].discriminant,
+ PushedDown::Yes
+ ));
+ Ok(())
+ }
+
+ #[test]
+ fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() ->
Result<()> {
+ let sort = make_sort_exec_with_fetch(Some(10));
+ assert!(sort.filter.is_some(), "TopK filter should be created");
+
+ let mut config = ConfigOptions::new();
+ config.optimizer.enable_topk_dynamic_filter_pushdown = true;
+ let desc = sort.gather_filters_for_pushdown(
+ FilterPushdownPhase::Post,
+ vec![Arc::new(Column::new("a", 0))],
+ &config,
+ )?;
+ // Parent filters are still blocked in the Post phase.
+ assert!(matches!(
+ desc.parent_filters()[0][0].discriminant,
+ PushedDown::No
+ ));
+ // But the TopK self-filter should be pushed down.
+ assert_eq!(desc.self_filters()[0].len(), 1);
+ Ok(())
+ }
}
diff --git a/datafusion/sqllogictest/test_files/limit.slt
b/datafusion/sqllogictest/test_files/limit.slt
index ff3c49485a..f5ec26d304 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -869,6 +869,45 @@ limit 1000;
statement ok
DROP TABLE test_limit_with_partitions;
+# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
+
+statement ok
+CREATE TABLE t(id INT, value INT) AS VALUES
+(1, 100),
+(2, 200),
+(3, 300),
+(4, 400),
+(5, 500);
+
+# Take the 3 smallest values (100, 200, 300), then filter value > 200.
+query II
+SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value >
200;
+----
+3 300
+
+# Take the 3 largest values (500, 400, 300), then filter value < 400.
+query II
+SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE
sub.value < 400;
+----
+3 300
+
+# The filter stays above the sort+fetch in the plan.
+query TT
+EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE
sub.value > 200;
+----
+logical_plan
+01)SubqueryAlias: sub
+02)--Filter: t.value > Int32(200)
+03)----Sort: t.value ASC NULLS LAST, fetch=3
+04)------TableScan: t projection=[id, value]
+physical_plan
+01)FilterExec: value@1 > 200
+02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST],
preserve_partitioning=[false]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+
+statement ok
+DROP TABLE t;
+
# Tear down src_table table:
statement ok
DROP TABLE src_table;
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 9fc053d38c..d444283aa3 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER()
OVER(ORDER BY a ASC) as rn1
----
logical_plan
01)Sort: rn1 ASC NULLS LAST
-02)--Sort: rn1 ASC NULLS LAST, fetch=5
-03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a,
annotated_data_infinite2.b, annotated_data_infinite2.c,
annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
-04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
+02)--Filter: rn1 < UInt64(50)
+03)----Sort: rn1 ASC NULLS LAST, fetch=5
+04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a,
annotated_data_infinite2.b, annotated_data_infinite2.c,
annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
physical_plan
-01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d,
row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
-02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
-03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 },
frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
-04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d],
infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST,
c@3 ASC NULLS LAST]
+01)FilterExec: rn1@5 < 50
+02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d,
row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
+03)----GlobalLimitExec: skip=0, fetch=5
+04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 },
frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d],
infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST,
c@3 ASC NULLS LAST]
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result
does already satisfy the required
# global order. The existing sort is for the second-term lexicographical
ordering requirement, which is being
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]