This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 59130f438c Support `LIMIT` Push-down logical plan optimization for
`Extension` nodes (#12685)
59130f438c is described below
commit 59130f438c374a84a48d83feaae51251776203a0
Author: Austin Liu <[email protected]>
AuthorDate: Thu Oct 3 06:05:48 2024 +0800
Support `LIMIT` Push-down logical plan optimization for `Extension` nodes
(#12685)
* Update trait `UserDefinedLogicalNodeCore`
Signed-off-by: Austin Liu <[email protected]>
* Update corresponding interface
Signed-off-by: Austin Liu <[email protected]>
Add rewrite rule for `push-down-limit` for `Extension`
Signed-off-by: Austin Liu <[email protected]>
* Add rewrite rule for `push-down-limit` for `Extension` and tests
Signed-off-by: Austin Liu <[email protected]>
* Update corresponding interface
Signed-off-by: Austin Liu <[email protected]>
* Reorganize to match guard
Signed-off-by: Austin Liu <[email protected]>
* Clena up
Signed-off-by: Austin Liu <[email protected]>
Clean up
Signed-off-by: Austin Liu <[email protected]>
---------
Signed-off-by: Austin Liu <[email protected]>
---
datafusion/core/src/physical_planner.rs | 4 +
.../core/tests/user_defined/user_defined_plan.rs | 4 +
datafusion/expr/src/logical_plan/extension.rs | 24 ++
datafusion/optimizer/src/analyzer/subquery.rs | 4 +
.../optimizer/src/optimize_projections/mod.rs | 8 +
datafusion/optimizer/src/push_down_filter.rs | 4 +
datafusion/optimizer/src/push_down_limit.rs | 249 ++++++++++++++++++++-
datafusion/optimizer/src/test/user_defined.rs | 4 +
.../proto/tests/cases/roundtrip_logical_plan.rs | 4 +
.../tests/cases/roundtrip_logical_plan.rs | 4 +
10 files changed, 308 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 520392c9f0..78c70606bf 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -2557,6 +2557,10 @@ mod tests {
) -> Result<Self> {
unimplemented!("NoOp");
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
#[derive(Debug)]
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index e51adbc4dd..2b45d0ed60 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -443,6 +443,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
expr: replace_sort_expression(self.expr.clone(),
exprs.swap_remove(0)),
})
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
/// Physical planner for TopK nodes
diff --git a/datafusion/expr/src/logical_plan/extension.rs
b/datafusion/expr/src/logical_plan/extension.rs
index d49c85fb6f..19d4cb3db9 100644
--- a/datafusion/expr/src/logical_plan/extension.rs
+++ b/datafusion/expr/src/logical_plan/extension.rs
@@ -195,6 +195,16 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync
{
/// directly because it must remain object safe.
fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool;
fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option<Ordering>;
+
+ /// Returns `true` if a limit can be safely pushed down through this
+ /// `UserDefinedLogicalNode` node.
+ ///
+ /// If this method returns `true`, and the query plan contains a limit at
+ /// the output of this node, DataFusion will push the limit to the input
+ /// of this node.
+ fn supports_limit_pushdown(&self) -> bool {
+ false
+ }
}
impl Hash for dyn UserDefinedLogicalNode {
@@ -295,6 +305,16 @@ pub trait UserDefinedLogicalNodeCore:
) -> Option<Vec<Vec<usize>>> {
None
}
+
+ /// Returns `true` if a limit can be safely pushed down through this
+ /// `UserDefinedLogicalNode` node.
+ ///
+ /// If this method returns `true`, and the query plan contains a limit at
+ /// the output of this node, DataFusion will push the limit to the input
+ /// of this node.
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
/// Automatically derive UserDefinedLogicalNode to `UserDefinedLogicalNode`
@@ -361,6 +381,10 @@ impl<T: UserDefinedLogicalNodeCore> UserDefinedLogicalNode
for T {
.downcast_ref::<Self>()
.and_then(|other| self.partial_cmp(other))
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ self.supports_limit_pushdown()
+ }
}
fn get_all_columns_from_schema(schema: &DFSchema) -> HashSet<String> {
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs
b/datafusion/optimizer/src/analyzer/subquery.rs
index c771f31a58..aabc549de5 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/optimizer/src/analyzer/subquery.rs
@@ -385,6 +385,10 @@ mod test {
empty_schema: Arc::clone(&self.empty_schema),
})
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
#[test]
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 5ab427a316..b5d581f391 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -895,6 +895,10 @@ mod tests {
// Since schema is same. Output columns requires their
corresponding version in the input columns.
Some(vec![output_columns.to_vec()])
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
#[derive(Debug, Hash, PartialEq, Eq)]
@@ -991,6 +995,10 @@ mod tests {
}
Some(vec![left_reqs, right_reqs])
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
#[test]
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 4e36cc6258..6e2cc0cbdb 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -1499,6 +1499,10 @@ mod tests {
schema: Arc::clone(&self.schema),
})
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
#[test]
diff --git a/datafusion/optimizer/src/push_down_limit.rs
b/datafusion/optimizer/src/push_down_limit.rs
index 158c7592df..8b5e483001 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -153,6 +153,29 @@ impl OptimizerRule for PushDownLimit {
subquery_alias.input = Arc::new(new_limit);
Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
}
+ LogicalPlan::Extension(extension_plan)
+ if extension_plan.node.supports_limit_pushdown() =>
+ {
+ let new_children = extension_plan
+ .node
+ .inputs()
+ .into_iter()
+ .map(|child| {
+ LogicalPlan::Limit(Limit {
+ skip: 0,
+ fetch: Some(fetch + skip),
+ input: Arc::new(child.clone()),
+ })
+ })
+ .collect::<Vec<_>>();
+
+ // Create a new extension node with updated inputs
+ let child_plan = LogicalPlan::Extension(extension_plan);
+ let new_extension =
+ child_plan.with_new_exprs(child_plan.expressions(),
new_children)?;
+
+ transformed_limit(skip, fetch, new_extension)
+ }
input => original_limit(skip, fetch, input),
}
}
@@ -258,17 +281,241 @@ fn push_down_join(mut join: Join, limit: usize) ->
Transformed<Join> {
#[cfg(test)]
mod test {
+ use std::cmp::Ordering;
+ use std::fmt::{Debug, Formatter};
use std::vec;
use super::*;
use crate::test::*;
- use datafusion_expr::{col, exists,
logical_plan::builder::LogicalPlanBuilder};
+
+ use datafusion_common::DFSchemaRef;
+ use datafusion_expr::{
+ col, exists, logical_plan::builder::LogicalPlanBuilder, Expr,
Extension,
+ UserDefinedLogicalNodeCore,
+ };
use datafusion_functions_aggregate::expr_fn::max;
fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) ->
Result<()> {
assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan,
expected)
}
+ #[derive(Debug, PartialEq, Eq, Hash)]
+ pub struct NoopPlan {
+ input: Vec<LogicalPlan>,
+ schema: DFSchemaRef,
+ }
+
+ // Manual implementation needed because of `schema` field. Comparison
excludes this field.
+ impl PartialOrd for NoopPlan {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.input.partial_cmp(&other.input)
+ }
+ }
+
+ impl UserDefinedLogicalNodeCore for NoopPlan {
+ fn name(&self) -> &str {
+ "NoopPlan"
+ }
+
+ fn inputs(&self) -> Vec<&LogicalPlan> {
+ self.input.iter().collect()
+ }
+
+ fn schema(&self) -> &DFSchemaRef {
+ &self.schema
+ }
+
+ fn expressions(&self) -> Vec<Expr> {
+ self.input
+ .iter()
+ .flat_map(|child| child.expressions())
+ .collect()
+ }
+
+ fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
+ write!(f, "NoopPlan")
+ }
+
+ fn with_exprs_and_inputs(
+ &self,
+ _exprs: Vec<Expr>,
+ inputs: Vec<LogicalPlan>,
+ ) -> Result<Self> {
+ Ok(Self {
+ input: inputs,
+ schema: Arc::clone(&self.schema),
+ })
+ }
+
+ fn supports_limit_pushdown(&self) -> bool {
+ true // Allow limit push-down
+ }
+ }
+
+ #[derive(Debug, PartialEq, Eq, Hash)]
+ struct NoLimitNoopPlan {
+ input: Vec<LogicalPlan>,
+ schema: DFSchemaRef,
+ }
+
+ // Manual implementation needed because of `schema` field. Comparison
excludes this field.
+ impl PartialOrd for NoLimitNoopPlan {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.input.partial_cmp(&other.input)
+ }
+ }
+
+ impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
+ fn name(&self) -> &str {
+ "NoLimitNoopPlan"
+ }
+
+ fn inputs(&self) -> Vec<&LogicalPlan> {
+ self.input.iter().collect()
+ }
+
+ fn schema(&self) -> &DFSchemaRef {
+ &self.schema
+ }
+
+ fn expressions(&self) -> Vec<Expr> {
+ self.input
+ .iter()
+ .flat_map(|child| child.expressions())
+ .collect()
+ }
+
+ fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
+ write!(f, "NoLimitNoopPlan")
+ }
+
+ fn with_exprs_and_inputs(
+ &self,
+ _exprs: Vec<Expr>,
+ inputs: Vec<LogicalPlan>,
+ ) -> Result<Self> {
+ Ok(Self {
+ input: inputs,
+ schema: Arc::clone(&self.schema),
+ })
+ }
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
+ }
+ #[test]
+ fn limit_pushdown_basic() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let noop_plan = LogicalPlan::Extension(Extension {
+ node: Arc::new(NoopPlan {
+ input: vec![table_scan.clone()],
+ schema: Arc::clone(table_scan.schema()),
+ }),
+ });
+
+ let plan = LogicalPlanBuilder::from(noop_plan)
+ .limit(0, Some(1000))?
+ .build()?;
+
+ let expected = "Limit: skip=0, fetch=1000\
+ \n NoopPlan\
+ \n Limit: skip=0, fetch=1000\
+ \n TableScan: test, fetch=1000";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn limit_pushdown_with_skip() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let noop_plan = LogicalPlan::Extension(Extension {
+ node: Arc::new(NoopPlan {
+ input: vec![table_scan.clone()],
+ schema: Arc::clone(table_scan.schema()),
+ }),
+ });
+
+ let plan = LogicalPlanBuilder::from(noop_plan)
+ .limit(10, Some(1000))?
+ .build()?;
+
+ let expected = "Limit: skip=10, fetch=1000\
+ \n NoopPlan\
+ \n Limit: skip=0, fetch=1010\
+ \n TableScan: test, fetch=1010";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn limit_pushdown_multiple_limits() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let noop_plan = LogicalPlan::Extension(Extension {
+ node: Arc::new(NoopPlan {
+ input: vec![table_scan.clone()],
+ schema: Arc::clone(table_scan.schema()),
+ }),
+ });
+
+ let plan = LogicalPlanBuilder::from(noop_plan)
+ .limit(10, Some(1000))?
+ .limit(20, Some(500))?
+ .build()?;
+
+ let expected = "Limit: skip=30, fetch=500\
+ \n NoopPlan\
+ \n Limit: skip=0, fetch=530\
+ \n TableScan: test, fetch=530";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn limit_pushdown_multiple_inputs() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let noop_plan = LogicalPlan::Extension(Extension {
+ node: Arc::new(NoopPlan {
+ input: vec![table_scan.clone(), table_scan.clone()],
+ schema: Arc::clone(table_scan.schema()),
+ }),
+ });
+
+ let plan = LogicalPlanBuilder::from(noop_plan)
+ .limit(0, Some(1000))?
+ .build()?;
+
+ let expected = "Limit: skip=0, fetch=1000\
+ \n NoopPlan\
+ \n Limit: skip=0, fetch=1000\
+ \n TableScan: test, fetch=1000\
+ \n Limit: skip=0, fetch=1000\
+ \n TableScan: test, fetch=1000";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let no_limit_noop_plan = LogicalPlan::Extension(Extension {
+ node: Arc::new(NoLimitNoopPlan {
+ input: vec![table_scan.clone()],
+ schema: Arc::clone(table_scan.schema()),
+ }),
+ });
+
+ let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
+ .limit(0, Some(1000))?
+ .build()?;
+
+ let expected = "Limit: skip=0, fetch=1000\
+ \n NoLimitNoopPlan\
+ \n TableScan: test";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
#[test]
fn limit_pushdown_projection_table_provider() -> Result<()> {
let table_scan = test_table_scan()?;
diff --git a/datafusion/optimizer/src/test/user_defined.rs
b/datafusion/optimizer/src/test/user_defined.rs
index 814cd0c0cd..a39f90b5da 100644
--- a/datafusion/optimizer/src/test/user_defined.rs
+++ b/datafusion/optimizer/src/test/user_defined.rs
@@ -76,4 +76,8 @@ impl UserDefinedLogicalNodeCore for TestUserDefinedPlanNode {
input: inputs.swap_remove(0),
})
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8a94f90581..cd789e06dc 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1060,6 +1060,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
expr: exprs.swap_remove(0),
})
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
#[derive(Debug)]
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index f7686bec54..3b7d0fd296 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -149,6 +149,10 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan
{
fn dyn_ord(&self, _: &dyn UserDefinedLogicalNode) -> Option<Ordering> {
unimplemented!()
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false // Disallow limit push-down by default
+ }
}
impl MockUserDefinedLogicalPlan {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]