This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 30dba587f Add LogicalPlanSignature and use in the optimizer loop
(#5623)
30dba587f is described below
commit 30dba587f4749327605a2eecb7ae9c0c41769c58
Author: Michał Słapek <[email protected]>
AuthorDate: Tue Mar 21 20:38:28 2023 +0100
Add LogicalPlanSignature and use in the optimizer loop (#5623)
* Add LogicalPlanSignature and use in the optimizer loop
* CR fix
---
datafusion/optimizer/src/lib.rs | 2 +
datafusion/optimizer/src/optimizer.rs | 173 +++++++++++++++++++++++++----
datafusion/optimizer/src/plan_signature.rs | 132 ++++++++++++++++++++++
3 files changed, 284 insertions(+), 23 deletions(-)
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 3fa199527..2a97c96e7 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -51,3 +51,5 @@ pub mod test;
pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
pub use utils::optimize_children;
+
+mod plan_signature;
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index 01e945119..35557b125 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -31,6 +31,7 @@ use
crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::inline_table_scan::InlineTableScan;
use crate::merge_projection::MergeProjection;
+use crate::plan_signature::LogicalPlanSignature;
use crate::propagate_empty_relation::PropagateEmptyRelation;
use crate::push_down_filter::PushDownFilter;
use crate::push_down_limit::PushDownLimit;
@@ -47,7 +48,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
-use std::borrow::Cow;
+use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
@@ -267,10 +268,14 @@ impl Optimizer {
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let options = config.options();
- let analyzed_plan = Analyzer::default().execute_and_check(plan,
options)?;
+ // execute_and_check has it's own timer
+ let mut new_plan = Analyzer::default().execute_and_check(plan,
options)?;
+
let start_time = Instant::now();
- let mut old_plan = Cow::Borrowed(&analyzed_plan);
- let mut new_plan = analyzed_plan.clone();
+
+ let mut previous_plans = HashSet::with_capacity(16);
+ previous_plans.insert(LogicalPlanSignature::new(&new_plan));
+
let mut i = 0;
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
@@ -280,18 +285,7 @@ impl Optimizer {
match result {
Ok(Some(plan)) => {
- if
!plan.schema().equivalent_names_and_types(new_plan.schema()) {
- let e = DataFusionError::Internal(format!(
- "Optimizer rule '{}' failed, due to generate a
different schema, original schema: {:?}, new schema: {:?}",
- rule.name(),
- new_plan.schema(),
- plan.schema()
- ));
- return Err(DataFusionError::Context(
- rule.name().to_string(),
- Box::new(e),
- ));
- }
+ assert_schema_is_the_same(rule.name(), &new_plan,
&plan)?;
new_plan = plan;
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
@@ -330,15 +324,14 @@ impl Optimizer {
}
log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
- // TODO this is an expensive way to see if the optimizer did
anything and
- // it would be better to change the OptimizerRule trait to return
an Option
- // instead
- if old_plan.as_ref() == &new_plan {
+ // HashSet::insert returns, whether the value was newly inserted.
+ let plan_is_fresh =
+ previous_plans.insert(LogicalPlanSignature::new(&new_plan));
+ if !plan_is_fresh {
// plan did not change, so no need to continue trying to
optimize
debug!("optimizer pass {} did not make changes", i);
break;
}
- old_plan = Cow::Owned(new_plan.clone());
i += 1;
}
log_plan("Final optimized plan", &new_plan);
@@ -419,6 +412,34 @@ impl Optimizer {
}
}
+/// Returns an error if plans have different schemas.
+///
+/// It ignores metadata and nullability.
+fn assert_schema_is_the_same(
+ rule_name: &str,
+ prev_plan: &LogicalPlan,
+ new_plan: &LogicalPlan,
+) -> Result<()> {
+ let equivalent = new_plan
+ .schema()
+ .equivalent_names_and_types(prev_plan.schema());
+
+ if !equivalent {
+ let e = DataFusionError::Internal(format!(
+ "Optimizer rule '{}' failed, due to generate a different schema,
original schema: {:?}, new schema: {:?}",
+ rule_name,
+ prev_plan.schema(),
+ new_plan.schema()
+ ));
+ Err(DataFusionError::Context(
+ String::from(rule_name),
+ Box::new(e),
+ ))
+ } else {
+ Ok(())
+ }
+}
+
/// Log the plan in debug/tracing mode after some part of the optimizer runs
fn log_plan(description: &str, plan: &LogicalPlan) {
debug!("{description}:\n{}\n", plan.display_indent());
@@ -432,8 +453,10 @@ mod tests {
use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError,
Result};
use datafusion_expr::logical_plan::EmptyRelation;
- use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection};
- use std::sync::Arc;
+ use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder,
Projection};
+ use std::sync::{Arc, Mutex};
+
+ use super::ApplyOrder;
#[test]
fn skip_failing_rule() {
@@ -512,6 +535,58 @@ mod tests {
Ok(())
}
+ #[test]
+ fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
+ // Run a goofy optimizer, which rotates projection columns
+ // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
+
+ let opt =
Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
+ let config = OptimizerContext::new().with_max_passes(16);
+
+ let initial_plan = LogicalPlanBuilder::empty(false)
+ .project([lit(1), lit(2), lit(3)])?
+ .project([lit(100)])? // to not trigger changed schema error
+ .build()?;
+
+ let mut plans: Vec<LogicalPlan> = Vec::new();
+ let final_plan =
+ opt.optimize(&initial_plan, &config, |p, _|
plans.push(p.clone()))?;
+
+ // initial_plan is not observed, so we have 3 plans
+ assert_eq!(3, plans.len());
+
+ // we got again the initial_plan with [1, 2, 3]
+ assert_eq!(initial_plan, final_plan);
+
+ Ok(())
+ }
+
+ #[test]
+ fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
+ // Run a goofy optimizer, which reverses and rotates projection columns
+ // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
+
+ let opt =
Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
+ let config = OptimizerContext::new().with_max_passes(16);
+
+ let initial_plan = LogicalPlanBuilder::empty(false)
+ .project([lit(1), lit(2), lit(3)])?
+ .project([lit(100)])? // to not trigger changed schema error
+ .build()?;
+
+ let mut plans: Vec<LogicalPlan> = Vec::new();
+ let final_plan =
+ opt.optimize(&initial_plan, &config, |p, _|
plans.push(p.clone()))?;
+
+ // initial_plan is not observed, so we have 4 plans
+ assert_eq!(4, plans.len());
+
+ // we got again the plan with [3, 2, 1]
+ assert_eq!(plans[0], final_plan);
+
+ Ok(())
+ }
+
fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
let new_fields = schema
.fields()
@@ -569,4 +644,56 @@ mod tests {
"get table_scan rule"
}
}
+
+ /// A goofy rule doing rotation of columns in all projections.
+ ///
+ /// Useful to test cycle detection.
+ struct RotateProjectionRule {
+ // reverse exprs instead of rotating on the first pass
+ reverse_on_first_pass: Mutex<bool>,
+ }
+
+ impl RotateProjectionRule {
+ fn new(reverse_on_first_pass: bool) -> Self {
+ Self {
+ reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
+ }
+ }
+ }
+
+ impl OptimizerRule for RotateProjectionRule {
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ let projection = match plan {
+ LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
+ _ => return Ok(None),
+ };
+
+ let mut exprs = projection.expr.clone();
+
+ let mut reverse = self.reverse_on_first_pass.lock().unwrap();
+ if *reverse {
+ exprs.reverse();
+ *reverse = false;
+ } else {
+ exprs.rotate_left(1);
+ }
+
+ Ok(Some(LogicalPlan::Projection(Projection::try_new(
+ exprs,
+ projection.input.clone(),
+ )?)))
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::TopDown)
+ }
+
+ fn name(&self) -> &str {
+ "rotate_projection"
+ }
+ }
}
diff --git a/datafusion/optimizer/src/plan_signature.rs
b/datafusion/optimizer/src/plan_signature.rs
new file mode 100644
index 000000000..64c06835a
--- /dev/null
+++ b/datafusion/optimizer/src/plan_signature.rs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::hash_map::DefaultHasher,
+ convert::Infallible,
+ hash::{Hash, Hasher},
+ num::NonZeroUsize,
+};
+
+use datafusion_expr::{LogicalPlan, PlanVisitor};
+
+/// Non-unique identifier of a [`LogicalPlan`].
+///
+/// See [`LogicalPlanSignature::new`] for details.
+#[derive(Clone, Copy, PartialEq, Eq, Hash)]
+pub struct LogicalPlanSignature {
+ node_number: NonZeroUsize,
+ plan_hash: u64,
+}
+
+impl LogicalPlanSignature {
+ /// Returns [`LogicalPlanSignature`] of the given [`LogicalPlan`].
+ ///
+ /// It is a kind of [`LogicalPlan`] hashing with stronger guarantees.
+ ///
+ /// # Guarantees
+ ///
+ /// Consider two [`LogicalPlan`]s `p1` and `p2`.
+ ///
+ /// If `p1` and `p2` have a different number of [`LogicalPlan`]s, then
+ /// they will have different [`LogicalPlanSignature`]s.
+ ///
+ /// If `p1` and `p2` have a different [`Hash`], then
+ /// they will have different [`LogicalPlanSignature`]s.
+ ///
+ /// # Caveats
+ ///
+ /// The intention of [`LogicalPlanSignature`] is to have a lower chance
+ /// of hash collisions.
+ ///
+ /// There exist different [`LogicalPlan`]s with the same
+ /// [`LogicalPlanSignature`].
+ ///
+ /// When two [`LogicalPlan`]s differ only in metadata, then they will have
+ /// the same [`LogicalPlanSignature`]s (due to hash implementation in
+ /// [`LogicalPlan`]).
+ pub fn new(plan: &LogicalPlan) -> Self {
+ let mut hasher = DefaultHasher::new();
+ plan.hash(&mut hasher);
+
+ Self {
+ node_number: get_node_number(plan),
+ plan_hash: hasher.finish(),
+ }
+ }
+}
+
+/// Get total number of [`LogicalPlan`]s in the plan.
+fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize {
+ struct Visitor {
+ node_number: usize,
+ }
+
+ impl PlanVisitor for Visitor {
+ type Error = Infallible;
+
+ fn pre_visit(&mut self, _: &LogicalPlan) -> Result<bool, Self::Error> {
+ self.node_number += 1;
+ Ok(true)
+ }
+ }
+
+ let mut v = Visitor { node_number: 0 };
+ plan.accept(&mut v).unwrap(); // Infallible
+
+ // Visitor must have at least visited the root,
+ // so v.node_number is at least 1.
+ NonZeroUsize::new(v.node_number).unwrap()
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use datafusion_common::{DFSchema, Result};
+ use datafusion_expr::{self, lit, LogicalPlan};
+
+ use crate::plan_signature::get_node_number;
+
+ #[test]
+ fn node_number_for_some_plan() -> Result<()> {
+ let schema = Arc::new(DFSchema::empty());
+
+ let one_node_plan =
+ Arc::new(LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation
{
+ produce_one_row: false,
+ schema: schema.clone(),
+ }));
+
+ assert_eq!(1, get_node_number(&one_node_plan).get());
+
+ let two_node_plan = Arc::new(LogicalPlan::Projection(
+ datafusion_expr::Projection::try_new(vec![lit(1), lit(2)],
one_node_plan)?,
+ ));
+
+ assert_eq!(2, get_node_number(&two_node_plan).get());
+
+ let five_node_plan =
Arc::new(LogicalPlan::Union(datafusion_expr::Union {
+ inputs: vec![two_node_plan.clone(), two_node_plan],
+ schema,
+ }));
+
+ assert_eq!(5, get_node_number(&five_node_plan).get());
+
+ Ok(())
+ }
+}