This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 61fc51446 Implement/fix Eq and Hash for Expr and LogicalPlan (#5421)
61fc51446 is described below

commit 61fc51446cb06bc6c8de69d50c9e5f79dede08fb
Author: Michał Słapek <[email protected]>
AuthorDate: Fri Mar 3 13:24:03 2023 +0100

    Implement/fix Eq and Hash for Expr and LogicalPlan (#5421)
    
    * Implement/fix Eq and Hash for Expr and LogicalPlan
    
    * CR fix
    
    * Fix merge from main
---
 datafusion/common/src/dfschema.rs             |  12 ++-
 datafusion/common/src/parsers.rs              |   2 +-
 datafusion/common/src/table_reference.rs      |   2 +-
 datafusion/core/src/physical_plan/planner.rs  |  14 +++
 datafusion/core/tests/user_defined_plan.rs    |  14 +++
 datafusion/expr/src/logical_plan/extension.rs |  27 ++++-
 datafusion/expr/src/logical_plan/plan.rs      | 143 ++++++++++++++++----------
 datafusion/expr/src/signature.rs              |   4 +-
 datafusion/optimizer/src/optimizer.rs         |   8 +-
 datafusion/optimizer/src/push_down_filter.rs  |  15 ++-
 datafusion/optimizer/src/test/user_defined.rs |  14 +++
 datafusion/proto/src/logical_plan/mod.rs      |  14 +++
 12 files changed, 200 insertions(+), 69 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index 71b67175d..7c3319628 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -20,6 +20,7 @@
 
 use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
+use std::hash::Hash;
 use std::sync::Arc;
 
 use crate::error::{DataFusionError, Result, SchemaError};
@@ -517,6 +518,15 @@ impl From<DFSchema> for SchemaRef {
     }
 }
 
+// Hashing refers to a subset of fields considered in PartialEq.
+#[allow(clippy::derive_hash_xor_eq)]
+impl Hash for DFSchema {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.fields.hash(state);
+        self.metadata.len().hash(state); // HashMap is not hashable
+    }
+}
+
 /// Convenience trait to convert Schema like things to DFSchema and 
DFSchemaRef with fewer keystrokes
 pub trait ToDFSchema
 where
@@ -608,7 +618,7 @@ impl ExprSchema for DFSchema {
 }
 
 /// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
     qualifier: Option<String>,
diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs
index 4aff7c7eb..6a61da970 100644
--- a/datafusion/common/src/parsers.rs
+++ b/datafusion/common/src/parsers.rs
@@ -26,7 +26,7 @@ const SECONDS_PER_HOUR: f64 = 3_600_f64;
 const NANOS_PER_SECOND: f64 = 1_000_000_000_f64;
 
 /// Readable file compression type
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub enum CompressionTypeVariant {
     /// Gzip-ed file
     GZIP,
diff --git a/datafusion/common/src/table_reference.rs 
b/datafusion/common/src/table_reference.rs
index 4ca41edd9..34656bc11 100644
--- a/datafusion/common/src/table_reference.rs
+++ b/datafusion/common/src/table_reference.rs
@@ -69,7 +69,7 @@ pub enum TableReference<'a> {
 
 /// Represents a path to a table that may require further resolution
 /// that owns the underlying names
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub enum OwnedTableReference {
     /// An unqualified table reference, e.g. "table"
     Bare {
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 1d4e7a86c..0a1ac4054 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -2373,6 +2373,7 @@ Internal error: Optimizer rule 'type_coercion' failed due 
to unexpected error: E
         }
     }
     /// An example extension node that doesn't do anything
+    #[derive(PartialEq, Eq, Hash)]
     struct NoOpExtensionNode {
         schema: DFSchemaRef,
     }
@@ -2425,6 +2426,19 @@ Internal error: Optimizer rule 'type_coercion' failed 
due to unexpected error: E
         ) -> Arc<dyn UserDefinedLogicalNode> {
             unimplemented!("NoOp");
         }
+
+        fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
+            match other.as_any().downcast_ref::<Self>() {
+                Some(o) => self == o,
+                None => false,
+            }
+        }
+
+        fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+            use std::hash::Hash;
+            let mut s = state;
+            self.hash(&mut s);
+        }
     }
 
     #[derive(Debug)]
diff --git a/datafusion/core/tests/user_defined_plan.rs 
b/datafusion/core/tests/user_defined_plan.rs
index 3b1ea76a8..02c8a1664 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -324,6 +324,7 @@ impl OptimizerRule for TopKOptimizerRule {
     }
 }
 
+#[derive(PartialEq, Eq, Hash)]
 struct TopKPlanNode {
     k: usize,
     input: LogicalPlan,
@@ -376,6 +377,19 @@ impl UserDefinedLogicalNode for TopKPlanNode {
             expr: exprs[0].clone(),
         })
     }
+
+    fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
+        match other.as_any().downcast_ref::<Self>() {
+            Some(o) => self == o,
+            None => false,
+        }
+    }
+
+    fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+        use std::hash::Hash;
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
 
 /// Physical planner for TopK nodes
diff --git a/datafusion/expr/src/logical_plan/extension.rs 
b/datafusion/expr/src/logical_plan/extension.rs
index fd3274144..57a8cac6b 100644
--- a/datafusion/expr/src/logical_plan/extension.rs
+++ b/datafusion/expr/src/logical_plan/extension.rs
@@ -18,14 +18,15 @@
 //! This module defines the interface for logical nodes
 use crate::{Expr, LogicalPlan};
 use datafusion_common::DFSchemaRef;
-use std::{any::Any, collections::HashSet, fmt, sync::Arc};
+use std::hash::{Hash, Hasher};
+use std::{any::Any, cmp::Eq, collections::HashSet, fmt, sync::Arc};
 
 /// This defines the interface for `LogicalPlan` nodes that can be
 /// used to extend DataFusion with custom relational operators.
 ///
 /// See the example in
 /// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
-/// example of how to use this extension API
+/// example of how to use this extension API.
 pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync {
     /// Return a reference to self as Any, to support dynamic downcasting
     fn as_any(&self) -> &dyn Any;
@@ -77,4 +78,26 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync {
         exprs: &[Expr],
         inputs: &[LogicalPlan],
     ) -> Arc<dyn UserDefinedLogicalNode>;
+
+    /// Hashing respecting requirements from [std::hash::Hash].
+    fn dyn_hash(&self, state: &mut dyn Hasher);
+
+    /// Comparison respecting requirements from [std::cmp::Eq].
+    ///
+    /// When `other` has an another type than `self`, then the values are 
*not* equal.
+    fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool;
+}
+
+impl Hash for dyn UserDefinedLogicalNode {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.dyn_hash(state);
+    }
 }
+
+impl std::cmp::PartialEq for dyn UserDefinedLogicalNode {
+    fn eq(&self, other: &Self) -> bool {
+        self.dyn_eq(other)
+    }
+}
+
+impl Eq for dyn UserDefinedLogicalNode {}
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index c3ef861eb..f5866d5a9 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -49,7 +49,7 @@ use std::sync::Arc;
 /// an output relation (table) with a (potentially) different
 /// schema. A plan represents a dataflow tree where data flows
 /// from leaves up to the root to produce the query result.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub enum LogicalPlan {
     /// Evaluates an arbitrary list of expressions (essentially a
     /// SELECT with an expression list) on its input.
@@ -1249,7 +1249,7 @@ impl Display for JoinType {
 }
 
 /// Join constraint
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub enum JoinConstraint {
     /// Join ON
     On,
@@ -1258,7 +1258,7 @@ pub enum JoinConstraint {
 }
 
 /// Creates a catalog (aka "Database").
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct CreateCatalog {
     /// The catalog name
     pub catalog_name: String,
@@ -1269,7 +1269,7 @@ pub struct CreateCatalog {
 }
 
 /// Creates a schema.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct CreateCatalogSchema {
     /// The table schema
     pub schema_name: String,
@@ -1280,7 +1280,7 @@ pub struct CreateCatalogSchema {
 }
 
 /// Drops a table.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct DropTable {
     /// The table name
     pub name: OwnedTableReference,
@@ -1291,7 +1291,7 @@ pub struct DropTable {
 }
 
 /// Drops a view.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct DropView {
     /// The view name
     pub name: OwnedTableReference,
@@ -1303,7 +1303,7 @@ pub struct DropView {
 
 /// Set a Variable's value -- value in
 /// [`ConfigOptions`](datafusion_common::config::ConfigOptions)
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct SetVariable {
     /// The variable name
     pub variable: String,
@@ -1314,7 +1314,7 @@ pub struct SetVariable {
 }
 
 /// Produces no rows: An empty relation with an empty schema
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct EmptyRelation {
     /// Whether to produce a placeholder row
     pub produce_one_row: bool,
@@ -1325,7 +1325,7 @@ pub struct EmptyRelation {
 /// Values expression. See
 /// [Postgres 
VALUES](https://www.postgresql.org/docs/current/queries-values.html)
 /// documentation for more details.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Values {
     /// The table schema
     pub schema: DFSchemaRef,
@@ -1335,7 +1335,7 @@ pub struct Values {
 
 /// Evaluates an arbitrary list of expressions (essentially a
 /// SELECT with an expression list) on its input.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 // mark non_exhaustive to encourage use of try_new/new()
 #[non_exhaustive]
 pub struct Projection {
@@ -1400,7 +1400,7 @@ impl Projection {
 }
 
 /// Aliased subquery
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 // mark non_exhaustive to encourage use of try_new/new()
 #[non_exhaustive]
 pub struct SubqueryAlias {
@@ -1440,7 +1440,7 @@ impl SubqueryAlias {
 ///
 /// Filter should not be created directly but instead use `try_new()`
 /// and that these fields are only pub to support pattern matching
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 #[non_exhaustive]
 pub struct Filter {
     /// The predicate expression, which must have Boolean type.
@@ -1488,7 +1488,7 @@ impl Filter {
 }
 
 /// Window its input based on a set of window spec and window function (e.g. 
SUM or RANK)
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Window {
     /// The incoming logical plan
     pub input: Arc<LogicalPlan>,
@@ -1515,8 +1515,30 @@ pub struct TableScan {
     pub fetch: Option<usize>,
 }
 
+impl PartialEq for TableScan {
+    fn eq(&self, other: &Self) -> bool {
+        self.table_name == other.table_name
+            && self.projection == other.projection
+            && self.projected_schema == other.projected_schema
+            && self.filters == other.filters
+            && self.fetch == other.fetch
+    }
+}
+
+impl Eq for TableScan {}
+
+impl Hash for TableScan {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.table_name.hash(state);
+        self.projection.hash(state);
+        self.projected_schema.hash(state);
+        self.filters.hash(state);
+        self.fetch.hash(state);
+    }
+}
+
 /// Apply Cross Join to two logical plans
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct CrossJoin {
     /// Left input
     pub left: Arc<LogicalPlan>,
@@ -1527,7 +1549,7 @@ pub struct CrossJoin {
 }
 
 /// Repartition the plan based on a partitioning scheme.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Repartition {
     /// The incoming logical plan
     pub input: Arc<LogicalPlan>,
@@ -1536,7 +1558,7 @@ pub struct Repartition {
 }
 
 /// Union multiple inputs
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Union {
     /// Inputs to merge
     pub inputs: Vec<Arc<LogicalPlan>>,
@@ -1545,7 +1567,7 @@ pub struct Union {
 }
 
 /// Creates an in memory table.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct CreateMemoryTable {
     /// The table name
     pub name: OwnedTableReference,
@@ -1558,7 +1580,7 @@ pub struct CreateMemoryTable {
 }
 
 /// Creates a view.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct CreateView {
     /// The table name
     pub name: OwnedTableReference,
@@ -1571,7 +1593,7 @@ pub struct CreateView {
 }
 
 /// Creates an external table.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq)]
 pub struct CreateExternalTable {
     /// The table schema
     pub schema: DFSchemaRef,
@@ -1597,7 +1619,25 @@ pub struct CreateExternalTable {
     pub options: HashMap<String, String>,
 }
 
-#[derive(Clone)]
+// Hashing refers to a subset of fields considered in PartialEq.
+#[allow(clippy::derive_hash_xor_eq)]
+impl Hash for CreateExternalTable {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.schema.hash(state);
+        self.name.hash(state);
+        self.location.hash(state);
+        self.file_type.hash(state);
+        self.has_header.hash(state);
+        self.delimiter.hash(state);
+        self.table_partition_cols.hash(state);
+        self.if_not_exists.hash(state);
+        self.definition.hash(state);
+        self.file_compression_type.hash(state);
+        self.options.len().hash(state); // HashMap is not hashable
+    }
+}
+
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub enum WriteOp {
     Insert,
     Delete,
@@ -1617,7 +1657,7 @@ impl Display for WriteOp {
 }
 
 /// The operator that modifies the content of a database (adapted from 
substrait WriteRel)
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct DmlStatement {
     /// The table name
     pub table_name: OwnedTableReference,
@@ -1631,7 +1671,7 @@ pub struct DmlStatement {
 
 /// Prepare a statement but do not execute it. Prepare statements can have 0 
or more
 /// `Expr::Placeholder` expressions that are filled in during execution
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Prepare {
     /// The name of the statement
     pub name: String,
@@ -1642,7 +1682,7 @@ pub struct Prepare {
 }
 
 /// Describe the schema of table
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct DescribeTable {
     /// Table schema
     pub schema: Arc<Schema>,
@@ -1652,7 +1692,7 @@ pub struct DescribeTable {
 
 /// Produces a relation with string representations of
 /// various parts of the plan
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Explain {
     /// Should extra (detailed, intermediate plans) be included?
     pub verbose: bool,
@@ -1668,7 +1708,7 @@ pub struct Explain {
 
 /// Runs the actual plan, and then prints the physical plan with
 /// with execution metrics.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Analyze {
     /// Should extra detail be included?
     pub verbose: bool,
@@ -1679,14 +1719,24 @@ pub struct Analyze {
 }
 
 /// Extension operator defined outside of DataFusion
-#[derive(Clone)]
+#[allow(clippy::derive_hash_xor_eq)] // see impl PartialEq for explanation
+#[derive(Clone, Eq, Hash)]
 pub struct Extension {
     /// The runtime extension operator
     pub node: Arc<dyn UserDefinedLogicalNode>,
 }
 
+impl PartialEq for Extension {
+    #[allow(clippy::op_ref)] // clippy false positive
+    fn eq(&self, other: &Self) -> bool {
+        // must be manually derived due to a bug in #[derive(PartialEq)]
+        // https://github.com/rust-lang/rust/issues/39128
+        &self.node == &other.node
+    }
+}
+
 /// Produces the first `n` tuples from its input and discards the rest.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Limit {
     /// Number of rows to skip before fetch
     pub skip: usize,
@@ -1698,7 +1748,7 @@ pub struct Limit {
 }
 
 /// Removes duplicate rows from the input
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Distinct {
     /// The logical plan that is being DISTINCT'd
     pub input: Arc<LogicalPlan>,
@@ -1706,7 +1756,7 @@ pub struct Distinct {
 
 /// Aggregates its input based on a set of grouping and aggregate
 /// expressions (e.g. SUM).
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 // mark non_exhaustive to encourage use of try_new/new()
 #[non_exhaustive]
 pub struct Aggregate {
@@ -1779,7 +1829,7 @@ impl Aggregate {
 }
 
 /// Sorts its input according to a list of sort expressions.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Sort {
     /// The sort expressions
     pub expr: Vec<Expr>,
@@ -1790,7 +1840,7 @@ pub struct Sort {
 }
 
 /// Join two logical plans on one or more join columns
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Join {
     /// Left input
     pub left: Arc<LogicalPlan>,
@@ -1846,7 +1896,7 @@ impl Join {
 }
 
 /// Subquery
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 pub struct Subquery {
     /// The subquery
     pub subquery: Arc<LogicalPlan>,
@@ -1873,29 +1923,8 @@ impl Debug for Subquery {
     }
 }
 
-impl Hash for Subquery {
-    fn hash<H: Hasher>(&self, state: &mut H) {
-        state.finish();
-    }
-
-    fn hash_slice<H: Hasher>(_data: &[Self], state: &mut H)
-    where
-        Self: Sized,
-    {
-        state.finish();
-    }
-}
-
-impl PartialEq for Subquery {
-    fn eq(&self, _other: &Self) -> bool {
-        false
-    }
-}
-
-impl Eq for Subquery {}
-
 /// Logical partitioning schemes supported by the repartition operator.
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub enum Partitioning {
     /// Allocate batches using a round-robin algorithm and the specified 
number of partitions
     RoundRobinBatch(usize),
@@ -1908,7 +1937,7 @@ pub enum Partitioning {
 
 /// Represents which type of plan, when storing multiple
 /// for use in EXPLAIN plans
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub enum PlanType {
     /// The initial LogicalPlan provided to DataFusion
     InitialLogicalPlan,
@@ -1948,7 +1977,7 @@ impl Display for PlanType {
 }
 
 /// Represents some sort of execution plan, in String form
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 #[allow(clippy::rc_buffer)]
 pub struct StringifiedPlan {
     /// An identifier of what type of plan this string represents
@@ -1984,7 +2013,7 @@ pub trait ToStringifiedPlan {
 }
 
 /// Unnest a column that contains a nested list type.
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct Unnest {
     /// The incoming logical plan
     pub input: Arc<LogicalPlan>,
diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr/src/signature.rs
index 3efe77e43..19909cf2f 100644
--- a/datafusion/expr/src/signature.rs
+++ b/datafusion/expr/src/signature.rs
@@ -37,7 +37,7 @@ pub enum Volatility {
 }
 
 /// A function's type signature, which defines the function's supported 
argument types.
-#[derive(Debug, Clone, PartialEq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub enum TypeSignature {
     /// arbitrary number of arguments of an common type out of a list of valid 
types
     // A function such as `concat` is `Variadic(vec![DataType::Utf8, 
DataType::LargeUtf8])`
@@ -59,7 +59,7 @@ pub enum TypeSignature {
 }
 
 ///The Signature of a function defines its supported input types as well as 
its volatility.
-#[derive(Debug, Clone, PartialEq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct Signature {
     /// type_signature - The types that the function accepts. See 
[TypeSignature] for more information.
     pub type_signature: TypeSignature,
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index 64a1750ee..5077bed90 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -45,6 +45,7 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::logical_plan::LogicalPlan;
 use log::{debug, trace, warn};
+use std::borrow::Cow;
 use std::sync::Arc;
 use std::time::Instant;
 
@@ -264,7 +265,7 @@ impl Optimizer {
     {
         let options = config.options();
         let start_time = Instant::now();
-        let mut plan_str = format!("{}", plan.display_indent());
+        let mut old_plan = Cow::Borrowed(plan);
         let mut new_plan = plan.clone();
         let mut i = 0;
         while i < options.optimizer.max_passes {
@@ -328,13 +329,12 @@ impl Optimizer {
             // TODO this is an expensive way to see if the optimizer did 
anything and
             // it would be better to change the OptimizerRule trait to return 
an Option
             // instead
-            let new_plan_str = format!("{}", new_plan.display_indent());
-            if plan_str == new_plan_str {
+            if old_plan.as_ref() == &new_plan {
                 // plan did not change, so no need to continue trying to 
optimize
                 debug!("optimizer pass {} did not make changes", i);
                 break;
             }
-            plan_str = new_plan_str;
+            old_plan = Cow::Owned(new_plan.clone());
             i += 1;
         }
         log_plan("Final optimized plan", &new_plan);
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 0d8da5573..f004e3d31 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -1074,7 +1074,7 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected)
     }
 
-    #[derive(Debug)]
+    #[derive(Debug, PartialEq, Eq, Hash)]
     struct NoopPlan {
         input: Vec<LogicalPlan>,
         schema: DFSchemaRef,
@@ -1118,6 +1118,19 @@ mod tests {
                 schema: self.schema.clone(),
             })
         }
+
+        fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
+            match other.as_any().downcast_ref::<Self>() {
+                Some(o) => self == o,
+                None => false,
+            }
+        }
+
+        fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+            use std::hash::Hash;
+            let mut s = state;
+            self.hash(&mut s);
+        }
     }
 
     #[test]
diff --git a/datafusion/optimizer/src/test/user_defined.rs 
b/datafusion/optimizer/src/test/user_defined.rs
index 92b56ee75..a1d5e4623 100644
--- a/datafusion/optimizer/src/test/user_defined.rs
+++ b/datafusion/optimizer/src/test/user_defined.rs
@@ -34,6 +34,7 @@ pub fn new(input: LogicalPlan) -> LogicalPlan {
     LogicalPlan::Extension(Extension { node })
 }
 
+#[derive(PartialEq, Eq, Hash)]
 struct TestUserDefinedPlanNode {
     input: LogicalPlan,
 }
@@ -76,4 +77,17 @@ impl UserDefinedLogicalNode for TestUserDefinedPlanNode {
             input: inputs[0].clone(),
         })
     }
+
+    fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
+        match other.as_any().downcast_ref::<Self>() {
+            Some(o) => self == o,
+            None => false,
+        }
+    }
+
+    fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+        use std::hash::Hash;
+        let mut s = state;
+        self.hash(&mut s);
+    }
 }
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index bf3c733c0..2f4f83d89 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1639,6 +1639,7 @@ mod roundtrip_tests {
         }
     }
 
+    #[derive(PartialEq, Eq, Hash)]
     struct TopKPlanNode {
         k: usize,
         input: LogicalPlan,
@@ -1695,6 +1696,19 @@ mod roundtrip_tests {
                 expr: exprs[0].clone(),
             })
         }
+
+        fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
+            match other.as_any().downcast_ref::<Self>() {
+                Some(o) => self == o,
+                None => false,
+            }
+        }
+
+        fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
+            use std::hash::Hash;
+            let mut s = state;
+            self.hash(&mut s);
+        }
     }
 
     #[derive(Debug)]

Reply via email to