alamb commented on code in PR #21057:
URL: https://github.com/apache/datafusion/pull/21057#discussion_r2969441662
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1392,6 +1392,40 @@ impl LogicalPlan {
}
}
+ /// Returns the fetch (limit) of this plan node, if it has one.
+ ///
+ /// Only [`LogicalPlan::Sort`] and [`LogicalPlan::TableScan`] carry a fetch
+ /// value directly; all other variants return `None`.
+ pub fn fetch(&self) -> Option<usize> {
+ match self {
+ LogicalPlan::Sort(Sort { fetch, .. }) => *fetch,
+ LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
+ LogicalPlan::Projection(_) => None,
+ LogicalPlan::Filter(_) => None,
+ LogicalPlan::Window(_) => None,
+ LogicalPlan::Aggregate(_) => None,
+ LogicalPlan::Join(_) => None,
+ LogicalPlan::Repartition(_) => None,
+ LogicalPlan::Union(_) => None,
+ LogicalPlan::EmptyRelation(_) => None,
+ LogicalPlan::Subquery(_) => None,
+ LogicalPlan::SubqueryAlias(_) => None,
+ LogicalPlan::Limit(_) => None,
Review Comment:
I am surprised LogicalPlan::Limit does not return a value here as well;
I looked into the code a bit to try and understand why Limits were needed.
Initially I thought it was because the following code claims that we only push
down Filters *after* Limits
https://github.com/apache/datafusion/blob/ee24f3c3cd320b88c5ea6a985cbc17d3a5b6b37b/datafusion/optimizer/src/optimizer.rs#L297-L299
However, at some point we started running multiple optimizer passes so
`PushdownFilter` can (effectively) run after `PushDownLimit`
https://github.com/apache/datafusion/blob/ee24f3c3cd320b88c5ea6a985cbc17d3a5b6b37b/datafusion/optimizer/src/optimizer.rs#L386-L402
So this bug was probably introduced when we started running multiple
optimizer passes
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -796,6 +796,12 @@ impl OptimizerRule for PushDownFilter {
filter.predicate = new_predicate;
}
+ // If the child has a fetch (limit), pushing a filter below it would
Review Comment:
👍
##########
datafusion/sqllogictest/test_files/push_down_filter_sort_fetch.slt:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
Review Comment:
I recommend adding this to an existing test rather than an entirely new .slt
test to make it easier to discover int he future
Perhaps `datafusion/sqllogictest/test_files/limit.slt`
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -796,6 +796,12 @@ impl OptimizerRule for PushDownFilter {
filter.predicate = new_predicate;
}
+ // If the child has a fetch (limit), pushing a filter below it would
+ // change semantics: the limit should apply before the filter, not
after.
+ if filter.input.fetch().is_some() {
Review Comment:
I think we should also either explicitly check here for Limit or (better
yet) change `fetch()` to also return Some for LogicalPlan::Limit for the
reasons I explain above
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -2863,4 +2875,111 @@ mod tests {
drop(contender);
Ok(())
}
+
+ #[test]
+ fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
Review Comment:
these tests are currently quite repeititive and mostly boilerplate -- not
only does this expand the diff unecessarily I think it also makes it harder to
read and understand what is different between the different tests (and thus
validate that they testing the correct thing)
I suggest making a helper funtion with the common setup so it is clear what
the different tests are
##########
datafusion/sqllogictest/test_files/window.slt:
##########
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER()
OVER(ORDER BY a ASC) as rn1
----
logical_plan
01)Sort: rn1 ASC NULLS LAST
-02)--Sort: rn1 ASC NULLS LAST, fetch=5
-03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a,
annotated_data_infinite2.b, annotated_data_infinite2.c,
annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
-04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
Review Comment:
it definitely had the filter too low 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]