This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 414487c4c optimizer: avoid every rule must recursive children in 
optimizer (#4618)
414487c4c is described below

commit 414487c4cdb61256f584d9a9dd9587ac90fc0f7c
Author: jakevin <[email protected]>
AuthorDate: Sat Dec 17 04:50:52 2022 +0800

    optimizer: avoid every rule must recursive children in optimizer (#4618)
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/optimizer/src/eliminate_limit.rs |  84 ++++++++++++--------
 datafusion/optimizer/src/optimizer.rs       | 119 +++++++++++++++++++++++++++-
 2 files changed, 169 insertions(+), 34 deletions(-)

diff --git a/datafusion/optimizer/src/eliminate_limit.rs 
b/datafusion/optimizer/src/eliminate_limit.rs
index 840346d85..9e3cbf6fa 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -20,11 +20,11 @@
 //! on a plan with an empty relation.
 //! This rule also removes OFFSET 0 from the [LogicalPlan]
 //! This saves time in planning and executing the query.
+use crate::optimizer::ApplyOrder;
+use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::Result;
 use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
 
-use crate::{utils, OptimizerConfig, OptimizerRule};
-
 /// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, 
fetch:None).
 /// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
 #[derive(Default)]
@@ -41,54 +41,71 @@ impl OptimizerRule for EliminateLimit {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        config: &dyn OptimizerConfig,
+        _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
-        if let LogicalPlan::Limit(limit) = plan {
-            match limit.fetch {
-                Some(fetch) => {
-                    if fetch == 0 {
-                        return 
Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: limit.input.schema().clone(),
-                        })));
-                    }
+        let limit = match plan {
+            LogicalPlan::Limit(limit) => limit,
+            _ => return Ok(None),
+        };
+
+        match limit.fetch {
+            Some(fetch) => {
+                if fetch == 0 {
+                    return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
+                        produce_one_row: false,
+                        schema: limit.input.schema().clone(),
+                    })));
                 }
-                None => {
-                    if limit.skip == 0 {
-                        let input = &*limit.input;
-                        return Ok(Some(utils::optimize_children(self, input, 
config)?));
-                    }
+            }
+            None => {
+                if limit.skip == 0 {
+                    let input = limit.input.as_ref();
+                    // input also can be Limit, so we should apply again.
+                    return Ok(Some(
+                        self.try_optimize(input, _config)?
+                            .unwrap_or_else(|| input.clone()),
+                    ));
                 }
             }
         }
-        Ok(Some(utils::optimize_children(self, plan, config)?))
+        Ok(None)
     }
 
     fn name(&self) -> &str {
         "eliminate_limit"
     }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::BottomUp)
+    }
 }
 
 #[cfg(test)]
 mod tests {
+    use super::*;
+    use crate::optimizer::Optimizer;
+    use crate::test::*;
+    use crate::OptimizerContext;
     use datafusion_common::Column;
     use datafusion_expr::{
         col,
         logical_plan::{builder::LogicalPlanBuilder, JoinType},
         sum,
     };
+    use std::sync::Arc;
 
-    use crate::optimizer::OptimizerContext;
     use crate::push_down_limit::PushDownLimit;
-    use crate::test::*;
-
-    use super::*;
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
-        let optimized_plan = EliminateLimit::new()
-            .try_optimize(plan, &OptimizerContext::new())
-            .unwrap()
-            .expect("failed to optimize plan");
+        let optimizer = 
Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]);
+        let optimized_plan = optimizer
+            .optimize_recursively(
+                optimizer.rules.get(0).unwrap(),
+                plan,
+                &OptimizerContext::new(),
+            )?
+            .unwrap_or_else(|| plan.clone());
+
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
         assert_eq!(plan.schema(), optimized_plan.schema());
@@ -99,13 +116,14 @@ mod tests {
         plan: &LogicalPlan,
         expected: &str,
     ) -> Result<()> {
-        let optimized_plan = PushDownLimit::new()
-            .try_optimize(plan, &OptimizerContext::new())
-            .unwrap()
-            .expect("failed to optimize plan");
-        let optimized_plan = EliminateLimit::new()
-            .try_optimize(&optimized_plan, &OptimizerContext::new())
-            .unwrap()
+        fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
+        let config = OptimizerContext::new().with_max_passes(1);
+        let optimizer = Optimizer::with_rules(vec![
+            Arc::new(PushDownLimit::new()),
+            Arc::new(EliminateLimit::new()),
+        ]);
+        let optimized_plan = optimizer
+            .optimize(plan, &config, observe)
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index 0dc651da2..fb9e21069 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -60,6 +60,13 @@ pub trait OptimizerRule {
 
     /// A human readable name for this optimizer rule
     fn name(&self) -> &str;
+
+    /// How should the rule be applied by the optimizer? See comments on 
[`ApplyOrder`] for details.
+    ///
+    /// If a rule use default None, its should traverse recursively plan 
inside itself
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        None
+    }
 }
 
 /// Options to control the DataFusion Optimizer.
@@ -180,6 +187,44 @@ pub struct Optimizer {
     pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
 }
 
+/// If a rule is with `ApplyOrder`, it means the optimizer will derive to 
handle children instead of
+/// recursively handling in rule.
+/// We just need handle a subtree pattern itself.
+///
+/// Notice: **sometime** result after optimize still can be optimized, we need 
apply again.
+///
+/// Usage Example: Merge Limit (subtree pattern is: Limit-Limit)
+/// ```rust
+/// use datafusion_expr::{Limit, LogicalPlan, LogicalPlanBuilder};
+/// use datafusion_common::Result;
+/// fn merge_limit(parent: &Limit, child: &Limit) -> LogicalPlan {
+///     // just for run
+///     return parent.input.as_ref().clone();
+/// }
+/// fn try_optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
+///     match plan {
+///         LogicalPlan::Limit(limit) => match limit.input.as_ref() {
+///             LogicalPlan::Limit(child_limit) => {
+///                 // merge limit ...
+///                 let optimized_plan = merge_limit(limit, child_limit);
+///                 // due to optimized_plan may be optimized again,
+///                 // for example: plan is Limit-Limit-Limit
+///                 Ok(Some(
+///                     try_optimize(&optimized_plan)?
+///                         .unwrap_or_else(|| optimized_plan.clone()),
+///                 ))
+///             }
+///             _ => Ok(None),
+///         },
+///         _ => Ok(None),
+///     }
+/// }
+/// ```
+pub enum ApplyOrder {
+    TopDown,
+    BottomUp,
+}
+
 impl Default for Optimizer {
     fn default() -> Self {
         Self::new()
@@ -253,8 +298,8 @@ impl Optimizer {
                     debug!("Skipping rule {} due to optimizer config", 
rule.name());
                     continue;
                 }
+                let result = self.optimize_recursively(rule, &new_plan, 
config);
 
-                let result = rule.try_optimize(&new_plan, config);
                 match result {
                     Ok(Some(plan)) => {
                         if 
!plan.schema().equivalent_names_and_types(new_plan.schema()) {
@@ -315,6 +360,78 @@ impl Optimizer {
         debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
         Ok(new_plan)
     }
+
+    fn optimize_node(
+        &self,
+        rule: &Arc<dyn OptimizerRule + Send + Sync>,
+        plan: &LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
+        // TODO: future feature: We can do Batch optimize
+        rule.try_optimize(plan, config)
+    }
+
+    fn optimize_inputs(
+        &self,
+        rule: &Arc<dyn OptimizerRule + Send + Sync>,
+        plan: &LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
+        let inputs = plan.inputs();
+        let result = inputs
+            .iter()
+            .map(|sub_plan| self.optimize_recursively(rule, sub_plan, config))
+            .collect::<Result<Vec<_>>>()?;
+        if result.is_empty() || result.iter().all(|o| o.is_none()) {
+            return Ok(None);
+        }
+
+        let new_inputs = result
+            .into_iter()
+            .enumerate()
+            .map(|(i, o)| match o {
+                Some(plan) => plan,
+                None => (*(inputs.get(i).unwrap())).clone(),
+            })
+            .collect::<Vec<_>>();
+
+        Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?))
+    }
+
+    /// Use a rule to optimize the whole plan.
+    /// If the rule with `ApplyOrder`, we don't need to recursively handle 
children in rule.
+    pub fn optimize_recursively(
+        &self,
+        rule: &Arc<dyn OptimizerRule + Send + Sync>,
+        plan: &LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
+        match rule.apply_order() {
+            Some(order) => match order {
+                ApplyOrder::TopDown => {
+                    let optimize_self_opt = self.optimize_node(rule, plan, 
config)?;
+                    let optimize_inputs_opt = match &optimize_self_opt {
+                        Some(optimized_plan) => {
+                            self.optimize_inputs(rule, optimized_plan, config)?
+                        }
+                        _ => self.optimize_inputs(rule, plan, config)?,
+                    };
+                    Ok(optimize_inputs_opt.or(optimize_self_opt))
+                }
+                ApplyOrder::BottomUp => {
+                    let optimize_inputs_opt = self.optimize_inputs(rule, plan, 
config)?;
+                    let optimize_self_opt = match &optimize_inputs_opt {
+                        Some(optimized_plan) => {
+                            self.optimize_node(rule, optimized_plan, config)?
+                        }
+                        _ => self.optimize_node(rule, plan, config)?,
+                    };
+                    Ok(optimize_self_opt.or(optimize_inputs_opt))
+                }
+            },
+            _ => rule.try_optimize(plan, config),
+        }
+    }
 }
 
 /// Log the plan in debug/tracing mode after some part of the optimizer runs

Reply via email to