This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 94646acbf Eliminate multi limit-offset nodes to emptyRelation  (#2823)
94646acbf is described below

commit 94646acbfb167951ad58406e9be6b468a1a12b64
Author: AssHero <[email protected]>
AuthorDate: Wed Jul 6 20:56:26 2022 +0800

    Eliminate multi limit-offset nodes to emptyRelation  (#2823)
    
    * eliminate multi limit-offset
    
    * refine the code
    
    * add more test cases
---
 datafusion/optimizer/src/eliminate_limit.rs | 234 ++++++++++++++++++++++++----
 1 file changed, 208 insertions(+), 26 deletions(-)

diff --git a/datafusion/optimizer/src/eliminate_limit.rs 
b/datafusion/optimizer/src/eliminate_limit.rs
index 931e7204c..b4fc06215 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Optimizer rule to replace `LIMIT 0` on a plan with an empty relation.
+//! Optimizer rule to replace `LIMIT 0` or
+//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's 
fetch`
+//! on a plan with an empty relation.
 //! This saves time in planning and executing the query.
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::Result;
@@ -24,7 +26,9 @@ use datafusion_expr::{
     utils::from_plan,
 };
 
-/// Optimization rule that replaces LIMIT 0 with an 
[LogicalPlan::EmptyRelation]
+/// Optimization rule that replaces LIMIT 0 or
+/// LIMIT whose ancestor LIMIT's skip is greater than or equal to current's 
fetch
+/// with an [LogicalPlan::EmptyRelation]
 #[derive(Default)]
 pub struct EliminateLimit;
 
@@ -35,35 +39,98 @@ impl EliminateLimit {
     }
 }
 
+/// Ancestor indicates the current ancestor in the LogicalPlan tree
+/// when traversing down related to "eliminate limit".
+enum Ancestor {
+    /// Limit
+    FromLimit { skip: Option<usize> },
+    /// Other nodes that don't affect the adjustment of "Limit"
+    NotRelevant,
+}
+
+/// replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
+/// replaces LIMIT node whose ancestor LIMIT's skip is greater than or equal 
to current's fetch
+/// with an [LogicalPlan::EmptyRelation]
+fn eliminate_limit(
+    _optimizer: &EliminateLimit,
+    ancestor: &Ancestor,
+    plan: &LogicalPlan,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Limit(Limit {
+            skip, fetch, input, ..
+        }) => {
+            let ancestor_skip = match ancestor {
+                Ancestor::FromLimit { skip, .. } => skip.unwrap_or(0),
+                _ => 0,
+            };
+            // If ancestor's skip is equal or greater than current's fetch,
+            // replaces with an [LogicalPlan::EmptyRelation].
+            // For such query, the inner query(select * from xxx limit 5) 
should be optimized as an EmptyRelation:
+            // select * from (select * from xxx limit 5) a limit 2 offset 5;
+            match fetch {
+                Some(fetch) => {
+                    if *fetch == 0 || ancestor_skip >= *fetch {
+                        return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                            produce_one_row: false,
+                            schema: input.schema().clone(),
+                        }));
+                    }
+                }
+                None => {}
+            }
+
+            let expr = plan.expressions();
+
+            // apply the optimization to all inputs of the plan
+            let inputs = plan.inputs();
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| {
+                    eliminate_limit(
+                        _optimizer,
+                        &Ancestor::FromLimit { skip: *skip },
+                        plan,
+                        _optimizer_config,
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            from_plan(plan, &expr, &new_inputs)
+        }
+        // Rest: recurse and find possible LIMIT 0/Multi LIMIT OFFSET nodes
+        _ => {
+            // For those plans(projection/sort/..) which do not affect the 
output rows of sub-plans, we still use ancestor;
+            // otherwise, use NotRelevant instead.
+            let ancestor = match plan {
+                LogicalPlan::Projection { .. } | LogicalPlan::Sort { .. } => 
ancestor,
+                _ => &Ancestor::NotRelevant,
+            };
+
+            let expr = plan.expressions();
+
+            // apply the optimization to all inputs of the plan
+            let inputs = plan.inputs();
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| {
+                    eliminate_limit(_optimizer, ancestor, plan, 
_optimizer_config)
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            from_plan(plan, &expr, &new_inputs)
+        }
+    }
+}
+
 impl OptimizerRule for EliminateLimit {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        match plan {
-            LogicalPlan::Limit(Limit {
-                fetch: Some(0),
-                input,
-                ..
-            }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
-                produce_one_row: false,
-                schema: input.schema().clone(),
-            })),
-            // Rest: recurse and find possible LIMIT 0 nodes
-            _ => {
-                let expr = plan.expressions();
-
-                // apply the optimization to all inputs of the plan
-                let inputs = plan.inputs();
-                let new_inputs = inputs
-                    .iter()
-                    .map(|plan| self.optimize(plan, optimizer_config))
-                    .collect::<Result<Vec<_>>>()?;
-
-                from_plan(plan, &expr, &new_inputs)
-            }
-        }
+        eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
     }
 
     fn name(&self) -> &str {
@@ -75,7 +142,12 @@ impl OptimizerRule for EliminateLimit {
 mod tests {
     use super::*;
     use crate::test::*;
-    use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder, sum};
+    use datafusion_common::Column;
+    use datafusion_expr::{
+        col,
+        logical_plan::{builder::LogicalPlanBuilder, JoinType},
+        sum,
+    };
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = EliminateLimit::new();
@@ -128,4 +200,114 @@ mod tests {
             \n    TableScan: test";
         assert_optimized_plan_eq(&plan, expected);
     }
+
+    #[test]
+    fn limit_fetch_with_ancestor_limit_skip() {
+        let table_scan = test_table_scan().unwrap();
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .aggregate(vec![col("a")], vec![sum(col("b"))])
+            .unwrap()
+            .limit(None, Some(2))
+            .unwrap()
+            .limit(Some(2), None)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        // No aggregate / scan / limit
+        let expected = "Limit: skip=2, fetch=None\
+            \n  EmptyRelation";
+        assert_optimized_plan_eq(&plan, expected);
+    }
+
+    #[test]
+    fn multi_limit_offset_sort_eliminate() {
+        let table_scan = test_table_scan().unwrap();
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .aggregate(vec![col("a")], vec![sum(col("b"))])
+            .unwrap()
+            .limit(None, Some(2))
+            .unwrap()
+            .sort(vec![col("a")])
+            .unwrap()
+            .limit(Some(2), Some(1))
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let expected = "Limit: skip=2, fetch=1\
+            \n  Sort: #test.a\
+            \n    EmptyRelation";
+        assert_optimized_plan_eq(&plan, expected);
+    }
+
+    #[test]
+    fn limit_fetch_with_ancestor_limit_fetch() {
+        let table_scan = test_table_scan().unwrap();
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .aggregate(vec![col("a")], vec![sum(col("b"))])
+            .unwrap()
+            .limit(None, Some(2))
+            .unwrap()
+            .sort(vec![col("a")])
+            .unwrap()
+            .limit(None, Some(1))
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let expected = "Limit: skip=None, fetch=1\
+            \n  Sort: #test.a\
+            \n    Limit: skip=None, fetch=2\
+            \n      Aggregate: groupBy=[[#test.a]], aggr=[[SUM(#test.b)]]\
+            \n        TableScan: test";
+        assert_optimized_plan_eq(&plan, expected);
+    }
+
+    #[test]
+    fn limit_with_ancestor_limit() {
+        let table_scan = test_table_scan().unwrap();
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .aggregate(vec![col("a")], vec![sum(col("b"))])
+            .unwrap()
+            .limit(Some(2), Some(1))
+            .unwrap()
+            .sort(vec![col("a")])
+            .unwrap()
+            .limit(Some(3), Some(1))
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let expected = "Limit: skip=3, fetch=1\
+            \n  Sort: #test.a\
+            \n    EmptyRelation";
+        assert_optimized_plan_eq(&plan, expected);
+    }
+
+    #[test]
+    fn limit_join_with_ancestor_limit() {
+        let table_scan = test_table_scan().unwrap();
+        let table_scan_inner = test_table_scan_with_name("test1").unwrap();
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .limit(Some(2), Some(1))
+            .unwrap()
+            .join_using(
+                &table_scan_inner,
+                JoinType::Inner,
+                vec![Column::from_name("a".to_string())],
+            )
+            .unwrap()
+            .limit(Some(3), Some(1))
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let expected = "Limit: skip=3, fetch=1\
+            \n  Inner Join: Using #test.a = #test1.a\
+            \n    Limit: skip=2, fetch=1\
+            \n      TableScan: test\
+            \n    TableScan: test1";
+        assert_optimized_plan_eq(&plan, expected);
+    }
 }

Reply via email to