This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 94646acbf Eliminate multi limit-offset nodes to emptyRelation (#2823)
94646acbf is described below
commit 94646acbfb167951ad58406e9be6b468a1a12b64
Author: AssHero <[email protected]>
AuthorDate: Wed Jul 6 20:56:26 2022 +0800
Eliminate multi limit-offset nodes to emptyRelation (#2823)
* eliminate multi limit-offset
* refine the code
* add more test cases
---
datafusion/optimizer/src/eliminate_limit.rs | 234 ++++++++++++++++++++++++----
1 file changed, 208 insertions(+), 26 deletions(-)
diff --git a/datafusion/optimizer/src/eliminate_limit.rs
b/datafusion/optimizer/src/eliminate_limit.rs
index 931e7204c..b4fc06215 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-//! Optimizer rule to replace `LIMIT 0` on a plan with an empty relation.
+//! Optimizer rule to replace `LIMIT 0` or
+//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's
fetch`
+//! on a plan with an empty relation.
//! This saves time in planning and executing the query.
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
@@ -24,7 +26,9 @@ use datafusion_expr::{
utils::from_plan,
};
-/// Optimization rule that replaces LIMIT 0 with an
[LogicalPlan::EmptyRelation]
+/// Optimization rule that replaces LIMIT 0 or
+/// LIMIT whose ancestor LIMIT's skip is greater than or equal to current's
fetch
+/// with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
pub struct EliminateLimit;
@@ -35,35 +39,98 @@ impl EliminateLimit {
}
}
+/// Ancestor indicates the current ancestor in the LogicalPlan tree
+/// when traversing down related to "eliminate limit".
+enum Ancestor {
+ /// Limit
+ FromLimit { skip: Option<usize> },
+ /// Other nodes that don't affect the adjustment of "Limit"
+ NotRelevant,
+}
+
+/// replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
+/// replaces LIMIT node whose ancestor LIMIT's skip is greater than or equal
to current's fetch
+/// with an [LogicalPlan::EmptyRelation]
+fn eliminate_limit(
+ _optimizer: &EliminateLimit,
+ ancestor: &Ancestor,
+ plan: &LogicalPlan,
+ _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+ match plan {
+ LogicalPlan::Limit(Limit {
+ skip, fetch, input, ..
+ }) => {
+ let ancestor_skip = match ancestor {
+ Ancestor::FromLimit { skip, .. } => skip.unwrap_or(0),
+ _ => 0,
+ };
+ // If ancestor's skip is equal or greater than current's fetch,
+ // replaces with an [LogicalPlan::EmptyRelation].
+ // For such query, the inner query(select * from xxx limit 5)
should be optimized as an EmptyRelation:
+ // select * from (select * from xxx limit 5) a limit 2 offset 5;
+ match fetch {
+ Some(fetch) => {
+ if *fetch == 0 || ancestor_skip >= *fetch {
+ return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: input.schema().clone(),
+ }));
+ }
+ }
+ None => {}
+ }
+
+ let expr = plan.expressions();
+
+ // apply the optimization to all inputs of the plan
+ let inputs = plan.inputs();
+ let new_inputs = inputs
+ .iter()
+ .map(|plan| {
+ eliminate_limit(
+ _optimizer,
+ &Ancestor::FromLimit { skip: *skip },
+ plan,
+ _optimizer_config,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ from_plan(plan, &expr, &new_inputs)
+ }
+ // Rest: recurse and find possible LIMIT 0/Multi LIMIT OFFSET nodes
+ _ => {
+ // For those plans(projection/sort/..) which do not affect the
output rows of sub-plans, we still use ancestor;
+ // otherwise, use NotRelevant instead.
+ let ancestor = match plan {
+ LogicalPlan::Projection { .. } | LogicalPlan::Sort { .. } =>
ancestor,
+ _ => &Ancestor::NotRelevant,
+ };
+
+ let expr = plan.expressions();
+
+ // apply the optimization to all inputs of the plan
+ let inputs = plan.inputs();
+ let new_inputs = inputs
+ .iter()
+ .map(|plan| {
+ eliminate_limit(_optimizer, ancestor, plan,
_optimizer_config)
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ from_plan(plan, &expr, &new_inputs)
+ }
+ }
+}
+
impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Limit(Limit {
- fetch: Some(0),
- input,
- ..
- }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row: false,
- schema: input.schema().clone(),
- })),
- // Rest: recurse and find possible LIMIT 0 nodes
- _ => {
- let expr = plan.expressions();
-
- // apply the optimization to all inputs of the plan
- let inputs = plan.inputs();
- let new_inputs = inputs
- .iter()
- .map(|plan| self.optimize(plan, optimizer_config))
- .collect::<Result<Vec<_>>>()?;
-
- from_plan(plan, &expr, &new_inputs)
- }
- }
+ eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
}
fn name(&self) -> &str {
@@ -75,7 +142,12 @@ impl OptimizerRule for EliminateLimit {
mod tests {
use super::*;
use crate::test::*;
- use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder, sum};
+ use datafusion_common::Column;
+ use datafusion_expr::{
+ col,
+ logical_plan::{builder::LogicalPlanBuilder, JoinType},
+ sum,
+ };
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
@@ -128,4 +200,114 @@ mod tests {
\n TableScan: test";
assert_optimized_plan_eq(&plan, expected);
}
+
+ #[test]
+ fn limit_fetch_with_ancestor_limit_skip() {
+ let table_scan = test_table_scan().unwrap();
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a")], vec![sum(col("b"))])
+ .unwrap()
+ .limit(None, Some(2))
+ .unwrap()
+ .limit(Some(2), None)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ // No aggregate / scan / limit
+ let expected = "Limit: skip=2, fetch=None\
+ \n EmptyRelation";
+ assert_optimized_plan_eq(&plan, expected);
+ }
+
+ #[test]
+ fn multi_limit_offset_sort_eliminate() {
+ let table_scan = test_table_scan().unwrap();
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a")], vec![sum(col("b"))])
+ .unwrap()
+ .limit(None, Some(2))
+ .unwrap()
+ .sort(vec![col("a")])
+ .unwrap()
+ .limit(Some(2), Some(1))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected = "Limit: skip=2, fetch=1\
+ \n Sort: #test.a\
+ \n EmptyRelation";
+ assert_optimized_plan_eq(&plan, expected);
+ }
+
+ #[test]
+ fn limit_fetch_with_ancestor_limit_fetch() {
+ let table_scan = test_table_scan().unwrap();
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a")], vec![sum(col("b"))])
+ .unwrap()
+ .limit(None, Some(2))
+ .unwrap()
+ .sort(vec![col("a")])
+ .unwrap()
+ .limit(None, Some(1))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected = "Limit: skip=None, fetch=1\
+ \n Sort: #test.a\
+ \n Limit: skip=None, fetch=2\
+ \n Aggregate: groupBy=[[#test.a]], aggr=[[SUM(#test.b)]]\
+ \n TableScan: test";
+ assert_optimized_plan_eq(&plan, expected);
+ }
+
+ #[test]
+ fn limit_with_ancestor_limit() {
+ let table_scan = test_table_scan().unwrap();
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a")], vec![sum(col("b"))])
+ .unwrap()
+ .limit(Some(2), Some(1))
+ .unwrap()
+ .sort(vec![col("a")])
+ .unwrap()
+ .limit(Some(3), Some(1))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected = "Limit: skip=3, fetch=1\
+ \n Sort: #test.a\
+ \n EmptyRelation";
+ assert_optimized_plan_eq(&plan, expected);
+ }
+
+ #[test]
+ fn limit_join_with_ancestor_limit() {
+ let table_scan = test_table_scan().unwrap();
+ let table_scan_inner = test_table_scan_with_name("test1").unwrap();
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .limit(Some(2), Some(1))
+ .unwrap()
+ .join_using(
+ &table_scan_inner,
+ JoinType::Inner,
+ vec![Column::from_name("a".to_string())],
+ )
+ .unwrap()
+ .limit(Some(3), Some(1))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected = "Limit: skip=3, fetch=1\
+ \n Inner Join: Using #test.a = #test1.a\
+ \n Limit: skip=2, fetch=1\
+ \n TableScan: test\
+ \n TableScan: test1";
+ assert_optimized_plan_eq(&plan, expected);
+ }
}