This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 414487c4c optimizer: avoid every rule must recursive children in
optimizer (#4618)
414487c4c is described below
commit 414487c4cdb61256f584d9a9dd9587ac90fc0f7c
Author: jakevin <[email protected]>
AuthorDate: Sat Dec 17 04:50:52 2022 +0800
optimizer: avoid every rule must recursive children in optimizer (#4618)
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/optimizer/src/eliminate_limit.rs | 84 ++++++++++++--------
datafusion/optimizer/src/optimizer.rs | 119 +++++++++++++++++++++++++++-
2 files changed, 169 insertions(+), 34 deletions(-)
diff --git a/datafusion/optimizer/src/eliminate_limit.rs
b/datafusion/optimizer/src/eliminate_limit.rs
index 840346d85..9e3cbf6fa 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -20,11 +20,11 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
+use crate::optimizer::ApplyOrder;
+use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
-use crate::{utils, OptimizerConfig, OptimizerRule};
-
/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0,
fetch:None).
/// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
#[derive(Default)]
@@ -41,54 +41,71 @@ impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
- config: &dyn OptimizerConfig,
+ _config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
- if let LogicalPlan::Limit(limit) = plan {
- match limit.fetch {
- Some(fetch) => {
- if fetch == 0 {
- return
Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row: false,
- schema: limit.input.schema().clone(),
- })));
- }
+ let limit = match plan {
+ LogicalPlan::Limit(limit) => limit,
+ _ => return Ok(None),
+ };
+
+ match limit.fetch {
+ Some(fetch) => {
+ if fetch == 0 {
+ return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: limit.input.schema().clone(),
+ })));
}
- None => {
- if limit.skip == 0 {
- let input = &*limit.input;
- return Ok(Some(utils::optimize_children(self, input,
config)?));
- }
+ }
+ None => {
+ if limit.skip == 0 {
+ let input = limit.input.as_ref();
+ // input also can be Limit, so we should apply again.
+ return Ok(Some(
+ self.try_optimize(input, _config)?
+ .unwrap_or_else(|| input.clone()),
+ ));
}
}
}
- Ok(Some(utils::optimize_children(self, plan, config)?))
+ Ok(None)
}
fn name(&self) -> &str {
"eliminate_limit"
}
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::BottomUp)
+ }
}
#[cfg(test)]
mod tests {
+ use super::*;
+ use crate::optimizer::Optimizer;
+ use crate::test::*;
+ use crate::OptimizerContext;
use datafusion_common::Column;
use datafusion_expr::{
col,
logical_plan::{builder::LogicalPlanBuilder, JoinType},
sum,
};
+ use std::sync::Arc;
- use crate::optimizer::OptimizerContext;
use crate::push_down_limit::PushDownLimit;
- use crate::test::*;
-
- use super::*;
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
- let optimized_plan = EliminateLimit::new()
- .try_optimize(plan, &OptimizerContext::new())
- .unwrap()
- .expect("failed to optimize plan");
+ let optimizer =
Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]);
+ let optimized_plan = optimizer
+ .optimize_recursively(
+ optimizer.rules.get(0).unwrap(),
+ plan,
+ &OptimizerContext::new(),
+ )?
+ .unwrap_or_else(|| plan.clone());
+
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
@@ -99,13 +116,14 @@ mod tests {
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
- let optimized_plan = PushDownLimit::new()
- .try_optimize(plan, &OptimizerContext::new())
- .unwrap()
- .expect("failed to optimize plan");
- let optimized_plan = EliminateLimit::new()
- .try_optimize(&optimized_plan, &OptimizerContext::new())
- .unwrap()
+ fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
+ let config = OptimizerContext::new().with_max_passes(1);
+ let optimizer = Optimizer::with_rules(vec![
+ Arc::new(PushDownLimit::new()),
+ Arc::new(EliminateLimit::new()),
+ ]);
+ let optimized_plan = optimizer
+ .optimize(plan, &config, observe)
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index 0dc651da2..fb9e21069 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -60,6 +60,13 @@ pub trait OptimizerRule {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
+
+ /// How should the rule be applied by the optimizer? See comments on
[`ApplyOrder`] for details.
+ ///
+ /// If a rule use default None, its should traverse recursively plan
inside itself
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ None
+ }
}
/// Options to control the DataFusion Optimizer.
@@ -180,6 +187,44 @@ pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
+/// If a rule is with `ApplyOrder`, it means the optimizer will derive to
handle children instead of
+/// recursively handling in rule.
+/// We just need handle a subtree pattern itself.
+///
+/// Notice: **sometime** result after optimize still can be optimized, we need
apply again.
+///
+/// Usage Example: Merge Limit (subtree pattern is: Limit-Limit)
+/// ```rust
+/// use datafusion_expr::{Limit, LogicalPlan, LogicalPlanBuilder};
+/// use datafusion_common::Result;
+/// fn merge_limit(parent: &Limit, child: &Limit) -> LogicalPlan {
+/// // just for run
+/// return parent.input.as_ref().clone();
+/// }
+/// fn try_optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
+/// match plan {
+/// LogicalPlan::Limit(limit) => match limit.input.as_ref() {
+/// LogicalPlan::Limit(child_limit) => {
+/// // merge limit ...
+/// let optimized_plan = merge_limit(limit, child_limit);
+/// // due to optimized_plan may be optimized again,
+/// // for example: plan is Limit-Limit-Limit
+/// Ok(Some(
+/// try_optimize(&optimized_plan)?
+/// .unwrap_or_else(|| optimized_plan.clone()),
+/// ))
+/// }
+/// _ => Ok(None),
+/// },
+/// _ => Ok(None),
+/// }
+/// }
+/// ```
+pub enum ApplyOrder {
+ TopDown,
+ BottomUp,
+}
+
impl Default for Optimizer {
fn default() -> Self {
Self::new()
@@ -253,8 +298,8 @@ impl Optimizer {
debug!("Skipping rule {} due to optimizer config",
rule.name());
continue;
}
+ let result = self.optimize_recursively(rule, &new_plan,
config);
- let result = rule.try_optimize(&new_plan, config);
match result {
Ok(Some(plan)) => {
if
!plan.schema().equivalent_names_and_types(new_plan.schema()) {
@@ -315,6 +360,78 @@ impl Optimizer {
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
+
+ fn optimize_node(
+ &self,
+ rule: &Arc<dyn OptimizerRule + Send + Sync>,
+ plan: &LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ // TODO: future feature: We can do Batch optimize
+ rule.try_optimize(plan, config)
+ }
+
+ fn optimize_inputs(
+ &self,
+ rule: &Arc<dyn OptimizerRule + Send + Sync>,
+ plan: &LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ let inputs = plan.inputs();
+ let result = inputs
+ .iter()
+ .map(|sub_plan| self.optimize_recursively(rule, sub_plan, config))
+ .collect::<Result<Vec<_>>>()?;
+ if result.is_empty() || result.iter().all(|o| o.is_none()) {
+ return Ok(None);
+ }
+
+ let new_inputs = result
+ .into_iter()
+ .enumerate()
+ .map(|(i, o)| match o {
+ Some(plan) => plan,
+ None => (*(inputs.get(i).unwrap())).clone(),
+ })
+ .collect::<Vec<_>>();
+
+ Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?))
+ }
+
+ /// Use a rule to optimize the whole plan.
+ /// If the rule with `ApplyOrder`, we don't need to recursively handle
children in rule.
+ pub fn optimize_recursively(
+ &self,
+ rule: &Arc<dyn OptimizerRule + Send + Sync>,
+ plan: &LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ match rule.apply_order() {
+ Some(order) => match order {
+ ApplyOrder::TopDown => {
+ let optimize_self_opt = self.optimize_node(rule, plan,
config)?;
+ let optimize_inputs_opt = match &optimize_self_opt {
+ Some(optimized_plan) => {
+ self.optimize_inputs(rule, optimized_plan, config)?
+ }
+ _ => self.optimize_inputs(rule, plan, config)?,
+ };
+ Ok(optimize_inputs_opt.or(optimize_self_opt))
+ }
+ ApplyOrder::BottomUp => {
+ let optimize_inputs_opt = self.optimize_inputs(rule, plan,
config)?;
+ let optimize_self_opt = match &optimize_inputs_opt {
+ Some(optimized_plan) => {
+ self.optimize_node(rule, optimized_plan, config)?
+ }
+ _ => self.optimize_node(rule, plan, config)?,
+ };
+ Ok(optimize_self_opt.or(optimize_inputs_opt))
+ }
+ },
+ _ => rule.try_optimize(plan, config),
+ }
+ }
}
/// Log the plan in debug/tracing mode after some part of the optimizer runs