This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 30dba587f Add LogicalPlanSignature and use in the optimizer loop 
(#5623)
30dba587f is described below

commit 30dba587f4749327605a2eecb7ae9c0c41769c58
Author: Michał Słapek <[email protected]>
AuthorDate: Tue Mar 21 20:38:28 2023 +0100

    Add LogicalPlanSignature and use in the optimizer loop (#5623)
    
    * Add LogicalPlanSignature and use in the optimizer loop
    
    * CR fix
---
 datafusion/optimizer/src/lib.rs            |   2 +
 datafusion/optimizer/src/optimizer.rs      | 173 +++++++++++++++++++++++++----
 datafusion/optimizer/src/plan_signature.rs | 132 ++++++++++++++++++++++
 3 files changed, 284 insertions(+), 23 deletions(-)

diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 3fa199527..2a97c96e7 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -51,3 +51,5 @@ pub mod test;
 
 pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
 pub use utils::optimize_children;
+
+mod plan_signature;
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index 01e945119..35557b125 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -31,6 +31,7 @@ use 
crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
 use crate::filter_null_join_keys::FilterNullJoinKeys;
 use crate::inline_table_scan::InlineTableScan;
 use crate::merge_projection::MergeProjection;
+use crate::plan_signature::LogicalPlanSignature;
 use crate::propagate_empty_relation::PropagateEmptyRelation;
 use crate::push_down_filter::PushDownFilter;
 use crate::push_down_limit::PushDownLimit;
@@ -47,7 +48,7 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::logical_plan::LogicalPlan;
 use log::{debug, trace, warn};
-use std::borrow::Cow;
+use std::collections::HashSet;
 use std::sync::Arc;
 use std::time::Instant;
 
@@ -267,10 +268,14 @@ impl Optimizer {
         F: FnMut(&LogicalPlan, &dyn OptimizerRule),
     {
         let options = config.options();
-        let analyzed_plan = Analyzer::default().execute_and_check(plan, 
options)?;
+        // execute_and_check has it's own timer
+        let mut new_plan = Analyzer::default().execute_and_check(plan, 
options)?;
+
         let start_time = Instant::now();
-        let mut old_plan = Cow::Borrowed(&analyzed_plan);
-        let mut new_plan = analyzed_plan.clone();
+
+        let mut previous_plans = HashSet::with_capacity(16);
+        previous_plans.insert(LogicalPlanSignature::new(&new_plan));
+
         let mut i = 0;
         while i < options.optimizer.max_passes {
             log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
@@ -280,18 +285,7 @@ impl Optimizer {
 
                 match result {
                     Ok(Some(plan)) => {
-                        if 
!plan.schema().equivalent_names_and_types(new_plan.schema()) {
-                            let e = DataFusionError::Internal(format!(
-                                "Optimizer rule '{}' failed, due to generate a 
different schema, original schema: {:?}, new schema: {:?}",
-                                rule.name(),
-                                new_plan.schema(),
-                                plan.schema()
-                            ));
-                            return Err(DataFusionError::Context(
-                                rule.name().to_string(),
-                                Box::new(e),
-                            ));
-                        }
+                        assert_schema_is_the_same(rule.name(), &new_plan, 
&plan)?;
                         new_plan = plan;
                         observer(&new_plan, rule.as_ref());
                         log_plan(rule.name(), &new_plan);
@@ -330,15 +324,14 @@ impl Optimizer {
             }
             log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
 
-            // TODO this is an expensive way to see if the optimizer did 
anything and
-            // it would be better to change the OptimizerRule trait to return 
an Option
-            // instead
-            if old_plan.as_ref() == &new_plan {
+            // HashSet::insert returns, whether the value was newly inserted.
+            let plan_is_fresh =
+                previous_plans.insert(LogicalPlanSignature::new(&new_plan));
+            if !plan_is_fresh {
                 // plan did not change, so no need to continue trying to 
optimize
                 debug!("optimizer pass {} did not make changes", i);
                 break;
             }
-            old_plan = Cow::Owned(new_plan.clone());
             i += 1;
         }
         log_plan("Final optimized plan", &new_plan);
@@ -419,6 +412,34 @@ impl Optimizer {
     }
 }
 
+/// Returns an error if plans have different schemas.
+///
+/// It ignores metadata and nullability.
+fn assert_schema_is_the_same(
+    rule_name: &str,
+    prev_plan: &LogicalPlan,
+    new_plan: &LogicalPlan,
+) -> Result<()> {
+    let equivalent = new_plan
+        .schema()
+        .equivalent_names_and_types(prev_plan.schema());
+
+    if !equivalent {
+        let e = DataFusionError::Internal(format!(
+            "Optimizer rule '{}' failed, due to generate a different schema, 
original schema: {:?}, new schema: {:?}",
+            rule_name,
+            prev_plan.schema(),
+            new_plan.schema()
+        ));
+        Err(DataFusionError::Context(
+            String::from(rule_name),
+            Box::new(e),
+        ))
+    } else {
+        Ok(())
+    }
+}
+
 /// Log the plan in debug/tracing mode after some part of the optimizer runs
 fn log_plan(description: &str, plan: &LogicalPlan) {
     debug!("{description}:\n{}\n", plan.display_indent());
@@ -432,8 +453,10 @@ mod tests {
     use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
     use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, 
Result};
     use datafusion_expr::logical_plan::EmptyRelation;
-    use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection};
-    use std::sync::Arc;
+    use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, 
Projection};
+    use std::sync::{Arc, Mutex};
+
+    use super::ApplyOrder;
 
     #[test]
     fn skip_failing_rule() {
@@ -512,6 +535,58 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
+        // Run a goofy optimizer, which rotates projection columns
+        // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
+
+        let opt = 
Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
+        let config = OptimizerContext::new().with_max_passes(16);
+
+        let initial_plan = LogicalPlanBuilder::empty(false)
+            .project([lit(1), lit(2), lit(3)])?
+            .project([lit(100)])? // to not trigger changed schema error
+            .build()?;
+
+        let mut plans: Vec<LogicalPlan> = Vec::new();
+        let final_plan =
+            opt.optimize(&initial_plan, &config, |p, _| 
plans.push(p.clone()))?;
+
+        // initial_plan is not observed, so we have 3 plans
+        assert_eq!(3, plans.len());
+
+        // we got again the initial_plan with [1, 2, 3]
+        assert_eq!(initial_plan, final_plan);
+
+        Ok(())
+    }
+
+    #[test]
+    fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
+        // Run a goofy optimizer, which reverses and rotates projection columns
+        // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
+
+        let opt = 
Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
+        let config = OptimizerContext::new().with_max_passes(16);
+
+        let initial_plan = LogicalPlanBuilder::empty(false)
+            .project([lit(1), lit(2), lit(3)])?
+            .project([lit(100)])? // to not trigger changed schema error
+            .build()?;
+
+        let mut plans: Vec<LogicalPlan> = Vec::new();
+        let final_plan =
+            opt.optimize(&initial_plan, &config, |p, _| 
plans.push(p.clone()))?;
+
+        // initial_plan is not observed, so we have 4 plans
+        assert_eq!(4, plans.len());
+
+        // we got again the plan with [3, 2, 1]
+        assert_eq!(plans[0], final_plan);
+
+        Ok(())
+    }
+
     fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
         let new_fields = schema
             .fields()
@@ -569,4 +644,56 @@ mod tests {
             "get table_scan rule"
         }
     }
+
+    /// A goofy rule doing rotation of columns in all projections.
+    ///
+    /// Useful to test cycle detection.
+    struct RotateProjectionRule {
+        // reverse exprs instead of rotating on the first pass
+        reverse_on_first_pass: Mutex<bool>,
+    }
+
+    impl RotateProjectionRule {
+        fn new(reverse_on_first_pass: bool) -> Self {
+            Self {
+                reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
+            }
+        }
+    }
+
+    impl OptimizerRule for RotateProjectionRule {
+        fn try_optimize(
+            &self,
+            plan: &LogicalPlan,
+            _: &dyn OptimizerConfig,
+        ) -> Result<Option<LogicalPlan>> {
+            let projection = match plan {
+                LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
+                _ => return Ok(None),
+            };
+
+            let mut exprs = projection.expr.clone();
+
+            let mut reverse = self.reverse_on_first_pass.lock().unwrap();
+            if *reverse {
+                exprs.reverse();
+                *reverse = false;
+            } else {
+                exprs.rotate_left(1);
+            }
+
+            Ok(Some(LogicalPlan::Projection(Projection::try_new(
+                exprs,
+                projection.input.clone(),
+            )?)))
+        }
+
+        fn apply_order(&self) -> Option<ApplyOrder> {
+            Some(ApplyOrder::TopDown)
+        }
+
+        fn name(&self) -> &str {
+            "rotate_projection"
+        }
+    }
 }
diff --git a/datafusion/optimizer/src/plan_signature.rs 
b/datafusion/optimizer/src/plan_signature.rs
new file mode 100644
index 000000000..64c06835a
--- /dev/null
+++ b/datafusion/optimizer/src/plan_signature.rs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::hash_map::DefaultHasher,
+    convert::Infallible,
+    hash::{Hash, Hasher},
+    num::NonZeroUsize,
+};
+
+use datafusion_expr::{LogicalPlan, PlanVisitor};
+
+/// Non-unique identifier of a [`LogicalPlan`].
+///
+/// See [`LogicalPlanSignature::new`] for details.
+#[derive(Clone, Copy, PartialEq, Eq, Hash)]
+pub struct LogicalPlanSignature {
+    node_number: NonZeroUsize,
+    plan_hash: u64,
+}
+
+impl LogicalPlanSignature {
+    /// Returns [`LogicalPlanSignature`] of the given [`LogicalPlan`].
+    ///
+    /// It is a kind of [`LogicalPlan`] hashing with stronger guarantees.
+    ///
+    /// # Guarantees
+    ///
+    /// Consider two [`LogicalPlan`]s `p1` and `p2`.
+    ///
+    /// If `p1` and `p2` have a different number of [`LogicalPlan`]s, then
+    /// they will have different [`LogicalPlanSignature`]s.
+    ///
+    /// If `p1` and `p2` have a different [`Hash`], then
+    /// they will have different [`LogicalPlanSignature`]s.
+    ///
+    /// # Caveats
+    ///
+    /// The intention of [`LogicalPlanSignature`] is to have a lower chance
+    /// of hash collisions.
+    ///
+    /// There exist different [`LogicalPlan`]s with the same
+    /// [`LogicalPlanSignature`].
+    ///
+    /// When two [`LogicalPlan`]s differ only in metadata, then they will have
+    /// the same [`LogicalPlanSignature`]s (due to hash implementation in
+    /// [`LogicalPlan`]).
+    pub fn new(plan: &LogicalPlan) -> Self {
+        let mut hasher = DefaultHasher::new();
+        plan.hash(&mut hasher);
+
+        Self {
+            node_number: get_node_number(plan),
+            plan_hash: hasher.finish(),
+        }
+    }
+}
+
+/// Get total number of [`LogicalPlan`]s in the plan.
+fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize {
+    struct Visitor {
+        node_number: usize,
+    }
+
+    impl PlanVisitor for Visitor {
+        type Error = Infallible;
+
+        fn pre_visit(&mut self, _: &LogicalPlan) -> Result<bool, Self::Error> {
+            self.node_number += 1;
+            Ok(true)
+        }
+    }
+
+    let mut v = Visitor { node_number: 0 };
+    plan.accept(&mut v).unwrap(); // Infallible
+
+    // Visitor must have at least visited the root,
+    // so v.node_number is at least 1.
+    NonZeroUsize::new(v.node_number).unwrap()
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use datafusion_common::{DFSchema, Result};
+    use datafusion_expr::{self, lit, LogicalPlan};
+
+    use crate::plan_signature::get_node_number;
+
+    #[test]
+    fn node_number_for_some_plan() -> Result<()> {
+        let schema = Arc::new(DFSchema::empty());
+
+        let one_node_plan =
+            Arc::new(LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation 
{
+                produce_one_row: false,
+                schema: schema.clone(),
+            }));
+
+        assert_eq!(1, get_node_number(&one_node_plan).get());
+
+        let two_node_plan = Arc::new(LogicalPlan::Projection(
+            datafusion_expr::Projection::try_new(vec![lit(1), lit(2)], 
one_node_plan)?,
+        ));
+
+        assert_eq!(2, get_node_number(&two_node_plan).get());
+
+        let five_node_plan = 
Arc::new(LogicalPlan::Union(datafusion_expr::Union {
+            inputs: vec![two_node_plan.clone(), two_node_plan],
+            schema,
+        }));
+
+        assert_eq!(5, get_node_number(&five_node_plan).get());
+
+        Ok(())
+    }
+}

Reply via email to