This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2510e34f1c fix: LimitPushdown rule uncorrect remove some 
GlobalLimitExec (#14245)
2510e34f1c is described below

commit 2510e34f1c9c068f25ebf24fb40950e4974c8a70
Author: Qi Zhu <821684...@qq.com>
AuthorDate: Thu Jan 30 04:48:23 2025 +0800

    fix: LimitPushdown rule uncorrect remove some GlobalLimitExec (#14245)
    
    * fix: LimitPushdown rule uncorrect remove some GlobalLimitExec
    
    * Fix some logic for maybe fetch
    
    * Fix test
    
    * Address comments
    
    * Address comments
    
    * Add comments
    
    * Address comments
---
 datafusion/core/tests/dataframe/mod.rs             |  5 --
 .../physical-optimizer/src/limit_pushdown.rs       | 11 +++-
 datafusion/sqllogictest/test_files/limit.slt       | 61 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 954c46ab27..1ebbf92c73 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -2217,11 +2217,6 @@ async fn write_parquet_with_order() -> Result<()> {
     let df = ctx.sql("SELECT * FROM data").await?;
     let results = df.collect().await?;
 
-    let df_explain = ctx.sql("explain SELECT a FROM data").await?;
-    let explain_result = df_explain.collect().await?;
-
-    println!("explain_result {:?}", explain_result);
-
     assert_batches_eq!(
         &[
             "+---+---+",
diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs 
b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 1c7e4d3d4c..5887cb51a7 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -31,7 +31,6 @@ use 
datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
-
 /// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
 /// the parent to the child if applicable.
 #[derive(Default, Debug)]
@@ -248,7 +247,15 @@ pub fn pushdown_limit_helper(
             }
         } else {
             // Add fetch or a `LimitExec`:
-            global_state.satisfied = true;
+            // If the plan's children have limit and the child's limit < 
parent's limit, we shouldn't change the global state to true,
+            // because the children limit will be overridden if the global 
state is changed.
+            if !pushdown_plan
+                .children()
+                .iter()
+                .any(|&child| extract_limit(child).is_some())
+            {
+                global_state.satisfied = true;
+            }
             pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
                 if global_skip > 0 {
                     add_global_limit(plan_with_fetch, global_skip, 
Some(global_fetch))
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index 5b98392f1a..308e759fa9 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -711,3 +711,64 @@ OFFSET 3 LIMIT 2;
 
 statement ok
 drop table ordered_table;
+
+# Test issue: https://github.com/apache/datafusion/issues/14204
+# Test limit pushdown with subquery
+statement ok
+create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4);
+
+query IIII
+select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit 
limit 1) limit 10;
+----
+1 2 1 2
+2 3 1 2
+3 4 1 2
+
+query TT
+explain select * from testSubQueryLimit as t1 join (select * from 
testSubQueryLimit limit 1) limit 10;
+----
+logical_plan
+01)Limit: skip=0, fetch=10
+02)--Cross Join: 
+03)----SubqueryAlias: t1
+04)------Limit: skip=0, fetch=10
+05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10
+06)----Limit: skip=0, fetch=1
+07)------TableScan: testsubquerylimit projection=[a, b], fetch=1
+physical_plan
+01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b]
+02)--GlobalLimitExec: skip=0, fetch=10
+03)----CrossJoinExec
+04)------GlobalLimitExec: skip=0, fetch=1
+05)--------MemoryExec: partitions=1, partition_sizes=[1]
+06)------GlobalLimitExec: skip=0, fetch=10
+07)--------MemoryExec: partitions=1, partition_sizes=[1]
+
+
+query IIII
+select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit 
limit 10) limit 2;
+----
+1 2 1 2
+1 2 2 3
+
+query TT
+explain select * from testSubQueryLimit as t1 join (select * from 
testSubQueryLimit limit 10) limit 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Cross Join: 
+03)----SubqueryAlias: t1
+04)------Limit: skip=0, fetch=2
+05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2
+06)----Limit: skip=0, fetch=2
+07)------TableScan: testsubquerylimit projection=[a, b], fetch=2
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=2
+02)--CrossJoinExec
+03)----GlobalLimitExec: skip=0, fetch=2
+04)------MemoryExec: partitions=1, partition_sizes=[1]
+05)----GlobalLimitExec: skip=0, fetch=2
+06)------MemoryExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table testSubQueryLimit;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to