This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 2510e34f1c fix: LimitPushdown rule uncorrect remove some GlobalLimitExec (#14245) 2510e34f1c is described below commit 2510e34f1c9c068f25ebf24fb40950e4974c8a70 Author: Qi Zhu <821684...@qq.com> AuthorDate: Thu Jan 30 04:48:23 2025 +0800 fix: LimitPushdown rule uncorrect remove some GlobalLimitExec (#14245) * fix: LimitPushdown rule uncorrect remove some GlobalLimitExec * Fix some logic for maybe fetch * Fix test * Address comments * Address comments * Add comments * Address comments --- datafusion/core/tests/dataframe/mod.rs | 5 -- .../physical-optimizer/src/limit_pushdown.rs | 11 +++- datafusion/sqllogictest/test_files/limit.slt | 61 ++++++++++++++++++++++ 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 954c46ab27..1ebbf92c73 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2217,11 +2217,6 @@ async fn write_parquet_with_order() -> Result<()> { let df = ctx.sql("SELECT * FROM data").await?; let results = df.collect().await?; - let df_explain = ctx.sql("explain SELECT a FROM data").await?; - let explain_result = df_explain.collect().await?; - - println!("explain_result {:?}", explain_result); - assert_batches_eq!( &[ "+---+---+", diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 1c7e4d3d4c..5887cb51a7 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -31,7 +31,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; - /// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from /// the parent to the child if applicable. #[derive(Default, Debug)] @@ -248,7 +247,15 @@ pub fn pushdown_limit_helper( } } else { // Add fetch or a `LimitExec`: - global_state.satisfied = true; + // If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true, + // because the children limit will be overridden if the global state is changed. + if !pushdown_plan + .children() + .iter() + .any(|&child| extract_limit(child).is_some()) + { + global_state.satisfied = true; + } pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { if global_skip > 0 { add_global_limit(plan_with_fetch, global_skip, Some(global_fetch)) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 5b98392f1a..308e759fa9 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -711,3 +711,64 @@ OFFSET 3 LIMIT 2; statement ok drop table ordered_table; + +# Test issue: https://github.com/apache/datafusion/issues/14204 +# Test limit pushdown with subquery +statement ok +create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4); + +query IIII +select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; +---- +1 2 1 2 +2 3 1 2 +3 4 1 2 + +query TT +explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; +---- +logical_plan +01)Limit: skip=0, fetch=10 +02)--Cross Join: +03)----SubqueryAlias: t1 +04)------Limit: skip=0, fetch=10 +05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10 +06)----Limit: skip=0, fetch=1 +07)------TableScan: testsubquerylimit projection=[a, b], fetch=1 +physical_plan +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--GlobalLimitExec: skip=0, fetch=10 +03)----CrossJoinExec +04)------GlobalLimitExec: skip=0, fetch=1 +05)--------MemoryExec: partitions=1, partition_sizes=[1] +06)------GlobalLimitExec: skip=0, fetch=10 +07)--------MemoryExec: partitions=1, partition_sizes=[1] + + +query IIII +select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2; +---- +1 2 1 2 +1 2 2 3 + +query TT +explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Cross Join: +03)----SubqueryAlias: t1 +04)------Limit: skip=0, fetch=2 +05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2 +06)----Limit: skip=0, fetch=2 +07)------TableScan: testsubquerylimit projection=[a, b], fetch=2 +physical_plan +01)GlobalLimitExec: skip=0, fetch=2 +02)--CrossJoinExec +03)----GlobalLimitExec: skip=0, fetch=2 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)----GlobalLimitExec: skip=0, fetch=2 +06)------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +drop table testSubQueryLimit; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org