This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 59130f438c Support `LIMIT` Push-down logical plan optimization for 
`Extension` nodes (#12685)
59130f438c is described below

commit 59130f438c374a84a48d83feaae51251776203a0
Author: Austin Liu <[email protected]>
AuthorDate: Thu Oct 3 06:05:48 2024 +0800

    Support `LIMIT` Push-down logical plan optimization for `Extension` nodes 
(#12685)
    
    * Update trait `UserDefinedLogicalNodeCore`
    
    Signed-off-by: Austin Liu <[email protected]>
    
    * Update corresponding interface
    
    Signed-off-by: Austin Liu <[email protected]>
    
    Add rewrite rule for `push-down-limit` for `Extension`
    
    Signed-off-by: Austin Liu <[email protected]>
    
    * Add rewrite rule for `push-down-limit` for `Extension` and tests
    
    Signed-off-by: Austin Liu <[email protected]>
    
    * Update corresponding interface
    
    Signed-off-by: Austin Liu <[email protected]>
    
    * Reorganize to match guard
    
    Signed-off-by: Austin Liu <[email protected]>
    
    * Clena up
    
    Signed-off-by: Austin Liu <[email protected]>
    
    Clean up
    
    Signed-off-by: Austin Liu <[email protected]>
    
    ---------
    
    Signed-off-by: Austin Liu <[email protected]>
---
 datafusion/core/src/physical_planner.rs            |   4 +
 .../core/tests/user_defined/user_defined_plan.rs   |   4 +
 datafusion/expr/src/logical_plan/extension.rs      |  24 ++
 datafusion/optimizer/src/analyzer/subquery.rs      |   4 +
 .../optimizer/src/optimize_projections/mod.rs      |   8 +
 datafusion/optimizer/src/push_down_filter.rs       |   4 +
 datafusion/optimizer/src/push_down_limit.rs        | 249 ++++++++++++++++++++-
 datafusion/optimizer/src/test/user_defined.rs      |   4 +
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   4 +
 .../tests/cases/roundtrip_logical_plan.rs          |   4 +
 10 files changed, 308 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 520392c9f0..78c70606bf 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -2557,6 +2557,10 @@ mod tests {
         ) -> Result<Self> {
             unimplemented!("NoOp");
         }
+
+        fn supports_limit_pushdown(&self) -> bool {
+            false // Disallow limit push-down by default
+        }
     }
 
     #[derive(Debug)]
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs 
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index e51adbc4dd..2b45d0ed60 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -443,6 +443,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
             expr: replace_sort_expression(self.expr.clone(), 
exprs.swap_remove(0)),
         })
     }
+
+    fn supports_limit_pushdown(&self) -> bool {
+        false // Disallow limit push-down by default
+    }
 }
 
 /// Physical planner for TopK nodes
diff --git a/datafusion/expr/src/logical_plan/extension.rs 
b/datafusion/expr/src/logical_plan/extension.rs
index d49c85fb6f..19d4cb3db9 100644
--- a/datafusion/expr/src/logical_plan/extension.rs
+++ b/datafusion/expr/src/logical_plan/extension.rs
@@ -195,6 +195,16 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync 
{
     /// directly because it must remain object safe.
     fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool;
     fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option<Ordering>;
+
+    /// Returns `true` if a limit can be safely pushed down through this
+    /// `UserDefinedLogicalNode` node.
+    ///
+    /// If this method returns `true`, and the query plan contains a limit at
+    /// the output of this node, DataFusion will push the limit to the input
+    /// of this node.
+    fn supports_limit_pushdown(&self) -> bool {
+        false
+    }
 }
 
 impl Hash for dyn UserDefinedLogicalNode {
@@ -295,6 +305,16 @@ pub trait UserDefinedLogicalNodeCore:
     ) -> Option<Vec<Vec<usize>>> {
         None
     }
+
+    /// Returns `true` if a limit can be safely pushed down through this
+    /// `UserDefinedLogicalNode` node.
+    ///
+    /// If this method returns `true`, and the query plan contains a limit at
+    /// the output of this node, DataFusion will push the limit to the input
+    /// of this node.
+    fn supports_limit_pushdown(&self) -> bool {
+        false // Disallow limit push-down by default
+    }
 }
 
 /// Automatically derive UserDefinedLogicalNode to `UserDefinedLogicalNode`
@@ -361,6 +381,10 @@ impl<T: UserDefinedLogicalNodeCore> UserDefinedLogicalNode 
for T {
             .downcast_ref::<Self>()
             .and_then(|other| self.partial_cmp(other))
     }
+
+    fn supports_limit_pushdown(&self) -> bool {
+        self.supports_limit_pushdown()
+    }
 }
 
 fn get_all_columns_from_schema(schema: &DFSchema) -> HashSet<String> {
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs 
b/datafusion/optimizer/src/analyzer/subquery.rs
index c771f31a58..aabc549de5 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/optimizer/src/analyzer/subquery.rs
@@ -385,6 +385,10 @@ mod test {
                 empty_schema: Arc::clone(&self.empty_schema),
             })
         }
+
+        fn supports_limit_pushdown(&self) -> bool {
+            false // Disallow limit push-down by default
+        }
     }
 
     #[test]
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 5ab427a316..b5d581f391 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -895,6 +895,10 @@ mod tests {
             // Since schema is same. Output columns requires their 
corresponding version in the input columns.
             Some(vec![output_columns.to_vec()])
         }
+
+        fn supports_limit_pushdown(&self) -> bool {
+            false // Disallow limit push-down by default
+        }
     }
 
     #[derive(Debug, Hash, PartialEq, Eq)]
@@ -991,6 +995,10 @@ mod tests {
             }
             Some(vec![left_reqs, right_reqs])
         }
+
+        fn supports_limit_pushdown(&self) -> bool {
+            false // Disallow limit push-down by default
+        }
     }
 
     #[test]
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 4e36cc6258..6e2cc0cbdb 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -1499,6 +1499,10 @@ mod tests {
                 schema: Arc::clone(&self.schema),
             })
         }
+
+        fn supports_limit_pushdown(&self) -> bool {
+            false // Disallow limit push-down by default
+        }
     }
 
     #[test]
diff --git a/datafusion/optimizer/src/push_down_limit.rs 
b/datafusion/optimizer/src/push_down_limit.rs
index 158c7592df..8b5e483001 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -153,6 +153,29 @@ impl OptimizerRule for PushDownLimit {
                 subquery_alias.input = Arc::new(new_limit);
                 
Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
             }
+            LogicalPlan::Extension(extension_plan)
+                if extension_plan.node.supports_limit_pushdown() =>
+            {
+                let new_children = extension_plan
+                    .node
+                    .inputs()
+                    .into_iter()
+                    .map(|child| {
+                        LogicalPlan::Limit(Limit {
+                            skip: 0,
+                            fetch: Some(fetch + skip),
+                            input: Arc::new(child.clone()),
+                        })
+                    })
+                    .collect::<Vec<_>>();
+
+                // Create a new extension node with updated inputs
+                let child_plan = LogicalPlan::Extension(extension_plan);
+                let new_extension =
+                    child_plan.with_new_exprs(child_plan.expressions(), 
new_children)?;
+
+                transformed_limit(skip, fetch, new_extension)
+            }
             input => original_limit(skip, fetch, input),
         }
     }
@@ -258,17 +281,241 @@ fn push_down_join(mut join: Join, limit: usize) -> 
Transformed<Join> {
 
 #[cfg(test)]
 mod test {
+    use std::cmp::Ordering;
+    use std::fmt::{Debug, Formatter};
     use std::vec;
 
     use super::*;
     use crate::test::*;
-    use datafusion_expr::{col, exists, 
logical_plan::builder::LogicalPlanBuilder};
+
+    use datafusion_common::DFSchemaRef;
+    use datafusion_expr::{
+        col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, 
Extension,
+        UserDefinedLogicalNodeCore,
+    };
     use datafusion_functions_aggregate::expr_fn::max;
 
     fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> 
Result<()> {
         assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, 
expected)
     }
 
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    pub struct NoopPlan {
+        input: Vec<LogicalPlan>,
+        schema: DFSchemaRef,
+    }
+
+    // Manual implementation needed because of `schema` field. Comparison 
excludes this field.
+    impl PartialOrd for NoopPlan {
+        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+            self.input.partial_cmp(&other.input)
+        }
+    }
+
+    impl UserDefinedLogicalNodeCore for NoopPlan {
+        fn name(&self) -> &str {
+            "NoopPlan"
+        }
+
+        fn inputs(&self) -> Vec<&LogicalPlan> {
+            self.input.iter().collect()
+        }
+
+        fn schema(&self) -> &DFSchemaRef {
+            &self.schema
+        }
+
+        fn expressions(&self) -> Vec<Expr> {
+            self.input
+                .iter()
+                .flat_map(|child| child.expressions())
+                .collect()
+        }
+
+        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
+            write!(f, "NoopPlan")
+        }
+
+        fn with_exprs_and_inputs(
+            &self,
+            _exprs: Vec<Expr>,
+            inputs: Vec<LogicalPlan>,
+        ) -> Result<Self> {
+            Ok(Self {
+                input: inputs,
+                schema: Arc::clone(&self.schema),
+            })
+        }
+
+        fn supports_limit_pushdown(&self) -> bool {
+            true // Allow limit push-down
+        }
+    }
+
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct NoLimitNoopPlan {
+        input: Vec<LogicalPlan>,
+        schema: DFSchemaRef,
+    }
+
+    // Manual implementation needed because of `schema` field. Comparison 
excludes this field.
+    impl PartialOrd for NoLimitNoopPlan {
+        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+            self.input.partial_cmp(&other.input)
+        }
+    }
+
+    impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
+        fn name(&self) -> &str {
+            "NoLimitNoopPlan"
+        }
+
+        fn inputs(&self) -> Vec<&LogicalPlan> {
+            self.input.iter().collect()
+        }
+
+        fn schema(&self) -> &DFSchemaRef {
+            &self.schema
+        }
+
+        fn expressions(&self) -> Vec<Expr> {
+            self.input
+                .iter()
+                .flat_map(|child| child.expressions())
+                .collect()
+        }
+
+        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
+            write!(f, "NoLimitNoopPlan")
+        }
+
+        fn with_exprs_and_inputs(
+            &self,
+            _exprs: Vec<Expr>,
+            inputs: Vec<LogicalPlan>,
+        ) -> Result<Self> {
+            Ok(Self {
+                input: inputs,
+                schema: Arc::clone(&self.schema),
+            })
+        }
+
+        fn supports_limit_pushdown(&self) -> bool {
+            false // Disallow limit push-down by default
+        }
+    }
+    #[test]
+    fn limit_pushdown_basic() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let noop_plan = LogicalPlan::Extension(Extension {
+            node: Arc::new(NoopPlan {
+                input: vec![table_scan.clone()],
+                schema: Arc::clone(table_scan.schema()),
+            }),
+        });
+
+        let plan = LogicalPlanBuilder::from(noop_plan)
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  NoopPlan\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000";
+
+        assert_optimized_plan_equal(plan, expected)
+    }
+
+    #[test]
+    fn limit_pushdown_with_skip() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let noop_plan = LogicalPlan::Extension(Extension {
+            node: Arc::new(NoopPlan {
+                input: vec![table_scan.clone()],
+                schema: Arc::clone(table_scan.schema()),
+            }),
+        });
+
+        let plan = LogicalPlanBuilder::from(noop_plan)
+            .limit(10, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  NoopPlan\
+        \n    Limit: skip=0, fetch=1010\
+        \n      TableScan: test, fetch=1010";
+
+        assert_optimized_plan_equal(plan, expected)
+    }
+
+    #[test]
+    fn limit_pushdown_multiple_limits() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let noop_plan = LogicalPlan::Extension(Extension {
+            node: Arc::new(NoopPlan {
+                input: vec![table_scan.clone()],
+                schema: Arc::clone(table_scan.schema()),
+            }),
+        });
+
+        let plan = LogicalPlanBuilder::from(noop_plan)
+            .limit(10, Some(1000))?
+            .limit(20, Some(500))?
+            .build()?;
+
+        let expected = "Limit: skip=30, fetch=500\
+        \n  NoopPlan\
+        \n    Limit: skip=0, fetch=530\
+        \n      TableScan: test, fetch=530";
+
+        assert_optimized_plan_equal(plan, expected)
+    }
+
+    #[test]
+    fn limit_pushdown_multiple_inputs() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let noop_plan = LogicalPlan::Extension(Extension {
+            node: Arc::new(NoopPlan {
+                input: vec![table_scan.clone(), table_scan.clone()],
+                schema: Arc::clone(table_scan.schema()),
+            }),
+        });
+
+        let plan = LogicalPlanBuilder::from(noop_plan)
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  NoopPlan\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000";
+
+        assert_optimized_plan_equal(plan, expected)
+    }
+
+    #[test]
+    fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let no_limit_noop_plan = LogicalPlan::Extension(Extension {
+            node: Arc::new(NoLimitNoopPlan {
+                input: vec![table_scan.clone()],
+                schema: Arc::clone(table_scan.schema()),
+            }),
+        });
+
+        let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
+            .limit(0, Some(1000))?
+            .build()?;
+
+        let expected = "Limit: skip=0, fetch=1000\
+        \n  NoLimitNoopPlan\
+        \n    TableScan: test";
+
+        assert_optimized_plan_equal(plan, expected)
+    }
+
     #[test]
     fn limit_pushdown_projection_table_provider() -> Result<()> {
         let table_scan = test_table_scan()?;
diff --git a/datafusion/optimizer/src/test/user_defined.rs 
b/datafusion/optimizer/src/test/user_defined.rs
index 814cd0c0cd..a39f90b5da 100644
--- a/datafusion/optimizer/src/test/user_defined.rs
+++ b/datafusion/optimizer/src/test/user_defined.rs
@@ -76,4 +76,8 @@ impl UserDefinedLogicalNodeCore for TestUserDefinedPlanNode {
             input: inputs.swap_remove(0),
         })
     }
+
+    fn supports_limit_pushdown(&self) -> bool {
+        false // Disallow limit push-down by default
+    }
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8a94f90581..cd789e06dc 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1060,6 +1060,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
             expr: exprs.swap_remove(0),
         })
     }
+
+    fn supports_limit_pushdown(&self) -> bool {
+        false // Disallow limit push-down by default
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index f7686bec54..3b7d0fd296 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -149,6 +149,10 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan 
{
     fn dyn_ord(&self, _: &dyn UserDefinedLogicalNode) -> Option<Ordering> {
         unimplemented!()
     }
+
+    fn supports_limit_pushdown(&self) -> bool {
+        false // Disallow limit push-down by default
+    }
 }
 
 impl MockUserDefinedLogicalPlan {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to