This is an automated email from the ASF dual-hosted git repository.

liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 48f0f3a78 optimize limit push for join case (#4411)
48f0f3a78 is described below

commit 48f0f3a78f7dfd6f175905fc31abbeecfd51766e
Author: Kun Liu <[email protected]>
AuthorDate: Thu Dec 1 10:35:20 2022 +0800

    optimize limit push for join case (#4411)
    
    * optimize limit push for join case
    
    * address comments
---
 datafusion/optimizer/src/limit_push_down.rs | 158 ++++++++++++++++++++++++++++
 1 file changed, 158 insertions(+)

diff --git a/datafusion/optimizer/src/limit_push_down.rs 
b/datafusion/optimizer/src/limit_push_down.rs
index 28b868fd6..8dcacb03d 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -38,6 +38,10 @@ impl LimitPushDown {
     }
 }
 
+fn is_no_join_condition(join: &Join) -> bool {
+    join.on.is_empty() && join.filter.is_none()
+}
+
 fn push_down_join(
     join: &Join,
     left_limit: Option<usize>,
@@ -190,6 +194,24 @@ impl OptimizerRule for LimitPushDown {
             LogicalPlan::Join(join) => {
                 let limit = fetch + skip;
                 let new_join = match join.join_type {
+                    JoinType::Left | JoinType::Right | JoinType::Full
+                        if is_no_join_condition(join) =>
+                    {
+                        // push left and right
+                        push_down_join(join, Some(limit), Some(limit))
+                    }
+                    JoinType::LeftSemi | JoinType::LeftAnti
+                        if is_no_join_condition(join) =>
+                    {
+                        // push left
+                        push_down_join(join, Some(limit), None)
+                    }
+                    JoinType::RightSemi | JoinType::RightAnti
+                        if is_no_join_condition(join) =>
+                    {
+                        // push right
+                        push_down_join(join, None, Some(limit))
+                    }
                     JoinType::Left => push_down_join(join, Some(limit), None),
                     JoinType::Right => push_down_join(join, None, Some(limit)),
                     _ => push_down_join(join, None, None),
@@ -604,6 +626,142 @@ mod test {
         assert_optimized_plan_eq(&outer_query, expected)
     }
 
+    #[test]
+    fn limit_should_push_down_join_without_condition() -> Result<()> {
+        let table_scan_1 = test_table_scan()?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+        let left_keys: Vec<&str> = Vec::new();
+        let right_keys: Vec<&str> = Vec::new();
+        let plan = LogicalPlanBuilder::from(table_scan_1.clone())
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
+                JoinType::Left,
+                (left_keys.clone(), right_keys.clone()),
+                None,
+            )?
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  Left Join: \
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test2, fetch=1000";
+
+        assert_optimized_plan_eq(&plan, expected)?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1.clone())
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
+                JoinType::Right,
+                (left_keys.clone(), right_keys.clone()),
+                None,
+            )?
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  Right Join: \
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test2, fetch=1000";
+
+        assert_optimized_plan_eq(&plan, expected)?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1.clone())
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
+                JoinType::Full,
+                (left_keys.clone(), right_keys.clone()),
+                None,
+            )?
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  Full Join: \
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test2, fetch=1000";
+
+        assert_optimized_plan_eq(&plan, expected)?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1.clone())
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
+                JoinType::LeftSemi,
+                (left_keys.clone(), right_keys.clone()),
+                None,
+            )?
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  LeftSemi Join: \
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
+        \n    TableScan: test2";
+
+        assert_optimized_plan_eq(&plan, expected)?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1.clone())
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
+                JoinType::LeftAnti,
+                (left_keys.clone(), right_keys.clone()),
+                None,
+            )?
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  LeftAnti Join: \
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
+        \n    TableScan: test2";
+
+        assert_optimized_plan_eq(&plan, expected)?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1.clone())
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
+                JoinType::RightSemi,
+                (left_keys.clone(), right_keys.clone()),
+                None,
+            )?
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  RightSemi Join: \
+        \n    TableScan: test\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test2, fetch=1000";
+
+        assert_optimized_plan_eq(&plan, expected)?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1)
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2).build()?,
+                JoinType::RightAnti,
+                (left_keys, right_keys),
+                None,
+            )?
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  RightAnti Join: \
+        \n    TableScan: test\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test2, fetch=1000";
+
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
     #[test]
     fn limit_should_push_down_left_outer_join() -> Result<()> {
         let table_scan_1 = test_table_scan()?;

Reply via email to