This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d9cb6e677e Implement PartialOrd for Expr and sub fields/structs
without using hash values (#12481)
d9cb6e677e is described below
commit d9cb6e677ec437a9c6ff2cc0900f902804da39bb
Author: ngli-me <[email protected]>
AuthorDate: Sat Sep 21 06:54:23 2024 -0400
Implement PartialOrd for Expr and sub fields/structs without using hash
values (#12481)
* Derive PartialOrd for WindowFrameBound, and added partial_cmp for
ScalarUDF based on the equals fn.
* Derived PartialOrd where possible for structs not using DFSchema/Schema.
Otherwise, implemented PartialOrd, ignoring the schema field.
* Added additional tests to verify partial ord for Expr, DdlStatement, and
LogicalPlan.
* Formatting.
* Added PartialOrd implementations where necessary, otherwise derived for
structs implementing UserDefinedLogicalNodeCore.
* Added Comparable versions for structs with more than 3 comparable fields.
* Added additional comments, specifying which fields caused a manual
implementation of `PartialOrd` to be necessary.
---------
Co-authored-by: nglime <[email protected]>
---
datafusion/common/src/display/mod.rs | 4 +-
datafusion/common/src/functional_dependencies.rs | 4 +-
datafusion/common/src/join_type.rs | 4 +-
datafusion/core/src/physical_planner.rs | 9 +
.../core/tests/user_defined/user_defined_plan.rs | 2 +-
datafusion/expr/src/expr.rs | 71 ++---
datafusion/expr/src/logical_plan/ddl.rs | 223 +++++++++++++-
datafusion/expr/src/logical_plan/dml.rs | 34 ++-
datafusion/expr/src/logical_plan/extension.rs | 19 +-
datafusion/expr/src/logical_plan/plan.rs | 334 ++++++++++++++++++++-
datafusion/expr/src/logical_plan/statement.rs | 41 ++-
datafusion/expr/src/udf.rs | 11 +
datafusion/expr/src/window_frame.rs | 4 +-
datafusion/optimizer/src/analyzer/subquery.rs | 7 +
.../optimizer/src/optimize_projections/mod.rs | 28 ++
datafusion/optimizer/src/push_down_filter.rs | 8 +
datafusion/optimizer/src/test/user_defined.rs | 2 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 2 +-
.../tests/cases/roundtrip_logical_plan.rs | 16 +
19 files changed, 744 insertions(+), 79 deletions(-)
diff --git a/datafusion/common/src/display/mod.rs
b/datafusion/common/src/display/mod.rs
index 2345c0e4c4..c12e7419e4 100644
--- a/datafusion/common/src/display/mod.rs
+++ b/datafusion/common/src/display/mod.rs
@@ -27,7 +27,7 @@ use std::{
/// Represents which type of plan, when storing multiple
/// for use in EXPLAIN plans
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum PlanType {
/// The initial LogicalPlan provided to DataFusion
InitialLogicalPlan,
@@ -96,7 +96,7 @@ impl Display for PlanType {
}
/// Represents some sort of execution plan, in String form
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct StringifiedPlan {
/// An identifier of what type of plan this string represents
pub plan_type: PlanType,
diff --git a/datafusion/common/src/functional_dependencies.rs
b/datafusion/common/src/functional_dependencies.rs
index 666ea73027..90f4e6e7e3 100644
--- a/datafusion/common/src/functional_dependencies.rs
+++ b/datafusion/common/src/functional_dependencies.rs
@@ -30,7 +30,7 @@ use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType,
Result};
use sqlparser::ast::TableConstraint;
/// This object defines a constraint on a table.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum Constraint {
/// Columns with the given indices form a composite primary key (they are
/// jointly unique and not nullable):
@@ -40,7 +40,7 @@ pub enum Constraint {
}
/// This object encapsulates a list of functional constraints:
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Constraints {
inner: Vec<Constraint>,
}
diff --git a/datafusion/common/src/join_type.rs
b/datafusion/common/src/join_type.rs
index 0a00a57ba4..fbdae1c50a 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -26,7 +26,7 @@ use crate::error::_not_impl_err;
use crate::{DataFusionError, Result};
/// Join type
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinType {
/// Inner Join
Inner,
@@ -88,7 +88,7 @@ impl FromStr for JoinType {
}
/// Join constraint
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinConstraint {
/// Join ON
On,
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 2010a5c664..84d285fc25 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1974,6 +1974,7 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) ->
Result<(T, R)> {
#[cfg(test)]
mod tests {
use std::any::Any;
+ use std::cmp::Ordering;
use std::fmt::{self, Debug};
use std::ops::{BitAnd, Not};
@@ -2528,6 +2529,14 @@ mod tests {
}
}
+ // Implementation needed for `UserDefinedLogicalNodeCore`, since the only
field is
+ // a schema, we can't derive `PartialOrd`, and we can't compare these.
+ impl PartialOrd for NoOpExtensionNode {
+ fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
+ None
+ }
+ }
+
impl UserDefinedLogicalNodeCore for NoOpExtensionNode {
fn name(&self) -> &str {
"NoOp"
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index 56edeab443..101e676484 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -387,7 +387,7 @@ impl OptimizerRule for TopKOptimizerRule {
}
}
-#[derive(PartialEq, Eq, Hash)]
+#[derive(PartialEq, Eq, PartialOrd, Hash)]
struct TopKPlanNode {
k: usize,
input: LogicalPlan,
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index db0bfd6b1b..8cb759b881 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -27,11 +27,11 @@ use std::sync::Arc;
use crate::expr_fn::binary_expr;
use crate::logical_plan::Subquery;
use crate::utils::expr_to_columns;
+use crate::Volatility;
use crate::{
built_in_window_function, udaf, BuiltInWindowFunction, ExprSchemable,
Operator,
Signature, WindowFrame, WindowUDF,
};
-use crate::{window_frame, Volatility};
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::tree_node::{
@@ -193,7 +193,7 @@ use sqlparser::ast::{
/// }
/// // The return value controls whether to continue visiting the tree
/// Ok(TreeNodeRecursion::Continue)
-/// }).unwrap();;
+/// }).unwrap();
/// // All subtrees have been visited and literals found
/// assert_eq!(scalars.len(), 2);
/// assert!(scalars.contains(&ScalarValue::Int32(Some(5))));
@@ -223,7 +223,7 @@ use sqlparser::ast::{
/// assert!(rewritten.transformed);
/// // to 42 = 5 AND b = 6
/// assert_eq!(rewritten.data, lit(42).eq(lit(5)).and(col("b").eq(lit(6))));
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum Expr {
/// An expression with a specific name.
Alias(Alias),
@@ -354,7 +354,7 @@ impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)>
for Expr {
}
/// UNNEST expression.
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Unnest {
pub expr: Box<Expr>,
}
@@ -374,7 +374,7 @@ impl Unnest {
}
/// Alias expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Alias {
pub expr: Box<Expr>,
pub relation: Option<TableReference>,
@@ -397,7 +397,7 @@ impl Alias {
}
/// Binary expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct BinaryExpr {
/// Left-hand side of the expression
pub left: Box<Expr>,
@@ -448,7 +448,7 @@ impl Display for BinaryExpr {
}
/// CASE expression
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Hash)]
pub struct Case {
/// Optional base expression that can be compared to literal values in the
"when" expressions
pub expr: Option<Box<Expr>>,
@@ -474,7 +474,7 @@ impl Case {
}
/// LIKE expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Like {
pub negated: bool,
pub expr: Box<Expr>,
@@ -504,7 +504,7 @@ impl Like {
}
/// BETWEEN expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Between {
/// The value to compare
pub expr: Box<Expr>,
@@ -529,7 +529,7 @@ impl Between {
}
/// ScalarFunction expression invokes a built-in scalar function
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct ScalarFunction {
/// The function
pub func: Arc<crate::ScalarUDF>,
@@ -567,7 +567,7 @@ pub enum GetFieldAccess {
}
/// Cast expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Cast {
/// The expression being cast
pub expr: Box<Expr>,
@@ -583,7 +583,7 @@ impl Cast {
}
/// TryCast Expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct TryCast {
/// The expression being cast
pub expr: Box<Expr>,
@@ -599,7 +599,7 @@ impl TryCast {
}
/// SORT expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Sort {
/// The expression to sort on
pub expr: Expr,
@@ -651,7 +651,7 @@ impl Display for Sort {
/// See also [`ExprFunctionExt`] to set these fields on `Expr`
///
/// [`ExprFunctionExt`]: crate::expr_fn::ExprFunctionExt
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct AggregateFunction {
/// Name of the function
pub func: Arc<crate::AggregateUDF>,
@@ -789,7 +789,7 @@ impl From<Arc<WindowUDF>> for WindowFunctionDefinition {
/// .build()
/// .unwrap();
/// ```
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct WindowFunction {
/// Name of the function
pub fun: WindowFunctionDefinition,
@@ -800,7 +800,7 @@ pub struct WindowFunction {
/// List of order by expressions
pub order_by: Vec<Sort>,
/// Window frame
- pub window_frame: window_frame::WindowFrame,
+ pub window_frame: WindowFrame,
/// Specifies how NULL value is treated: ignore or respect
pub null_treatment: Option<NullTreatment>,
}
@@ -840,7 +840,7 @@ pub fn find_df_window_func(name: &str) ->
Option<WindowFunctionDefinition> {
}
/// EXISTS expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Exists {
/// subquery that will produce a single column of data
pub subquery: Subquery,
@@ -888,7 +888,7 @@ impl AggregateUDF {
}
/// InList expression
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct InList {
/// The expression to compare
pub expr: Box<Expr>,
@@ -910,7 +910,7 @@ impl InList {
}
/// IN subquery
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct InSubquery {
/// The expression to compare
pub expr: Box<Expr>,
@@ -935,7 +935,7 @@ impl InSubquery {
///
/// The type of these parameters is inferred using
[`Expr::infer_placeholder_types`]
/// or can be specified directly using `PREPARE` statements.
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Placeholder {
/// The identifier of the parameter, including the leading `$` (e.g,
`"$1"` or `"$foo"`)
pub id: String,
@@ -956,7 +956,7 @@ impl Placeholder {
/// for Postgres definition.
/// See
<https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-groupby.html>
/// for Apache Spark definition.
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum GroupingSet {
/// Rollup grouping sets
Rollup(Vec<Expr>),
@@ -989,7 +989,7 @@ impl GroupingSet {
}
/// Additional options for wildcards, e.g. Snowflake `EXCLUDE`/`RENAME` and
Bigquery `EXCEPT`.
-#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug, Default)]
pub struct WildcardOptions {
/// `[ILIKE...]`.
/// Snowflake syntax:
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
@@ -1045,7 +1045,7 @@ impl Display for WildcardOptions {
}
/// The planned expressions for `REPLACE`
-#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug, Default)]
pub struct PlannedReplaceSelectItem {
/// The original ast nodes
pub items: Vec<ReplaceSelectElement>,
@@ -1071,18 +1071,6 @@ impl PlannedReplaceSelectItem {
}
}
-/// Fixed seed for the hashing so that Ords are consistent across runs
-const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
-
-impl PartialOrd for Expr {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- let s = SEED.hash_one(self);
- let o = SEED.hash_one(other);
-
- Some(s.cmp(&o))
- }
-}
-
impl Expr {
#[deprecated(since = "40.0.0", note = "use schema_name instead")]
pub fn display_name(&self) -> Result<String> {
@@ -2432,20 +2420,15 @@ mod test {
#[test]
fn test_partial_ord() {
- // Test validates that partial ord is defined for Expr using hashes,
not
+ // Test validates that partial ord is defined for Expr, not
// intended to exhaustively test all possibilities
let exp1 = col("a") + lit(1);
let exp2 = col("a") + lit(2);
let exp3 = !(col("a") + lit(2));
- // Since comparisons are done using hash value of the expression
- // expr < expr2 may return false, or true. There is no guaranteed
result.
- // The only guarantee is "<" operator should have the opposite result
of ">=" operator
- let greater_or_equal = exp1 >= exp2;
- assert_eq!(exp1 < exp2, !greater_or_equal);
-
- let greater_or_equal = exp3 >= exp2;
- assert_eq!(exp3 < exp2, !greater_or_equal);
+ assert!(exp1 < exp2);
+ assert!(exp3 > exp2);
+ assert!(exp1 < exp3)
}
#[test]
diff --git a/datafusion/expr/src/logical_plan/ddl.rs
b/datafusion/expr/src/logical_plan/ddl.rs
index 3fc43200ef..9aaa5c9803 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use crate::{Expr, LogicalPlan, SortExpr, Volatility};
+use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;
use std::{
@@ -22,15 +24,13 @@ use std::{
hash::{Hash, Hasher},
};
-use crate::{Expr, LogicalPlan, SortExpr, Volatility};
-
use crate::expr::Sort;
use arrow::datatypes::DataType;
use datafusion_common::{Constraints, DFSchemaRef, SchemaReference,
TableReference};
use sqlparser::ast::Ident;
/// Various types of DDL (CREATE / DROP) catalog manipulation
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum DdlStatement {
/// Creates an external table.
CreateExternalTable(CreateExternalTable),
@@ -232,8 +232,59 @@ impl Hash for CreateExternalTable {
}
}
+// Manual implementation needed because of `schema`, `options`, and
`column_defaults` fields.
+// Comparison excludes these fields.
+impl PartialOrd for CreateExternalTable {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableCreateExternalTable<'a> {
+ /// The table name
+ pub name: &'a TableReference,
+ /// The physical location
+ pub location: &'a String,
+ /// The file type of physical file
+ pub file_type: &'a String,
+ /// Partition Columns
+ pub table_partition_cols: &'a Vec<String>,
+ /// Option to not error if table already exists
+ pub if_not_exists: &'a bool,
+ /// SQL used to create the table, if available
+ pub definition: &'a Option<String>,
+ /// Order expressions supplied by user
+ pub order_exprs: &'a Vec<Vec<Sort>>,
+ /// Whether the table is an infinite streams
+ pub unbounded: &'a bool,
+ /// The list of constraints in the schema, such as primary key,
unique, etc.
+ pub constraints: &'a Constraints,
+ }
+ let comparable_self = ComparableCreateExternalTable {
+ name: &self.name,
+ location: &self.location,
+ file_type: &self.file_type,
+ table_partition_cols: &self.table_partition_cols,
+ if_not_exists: &self.if_not_exists,
+ definition: &self.definition,
+ order_exprs: &self.order_exprs,
+ unbounded: &self.unbounded,
+ constraints: &self.constraints,
+ };
+ let comparable_other = ComparableCreateExternalTable {
+ name: &other.name,
+ location: &other.location,
+ file_type: &other.file_type,
+ table_partition_cols: &other.table_partition_cols,
+ if_not_exists: &other.if_not_exists,
+ definition: &other.definition,
+ order_exprs: &other.order_exprs,
+ unbounded: &other.unbounded,
+ constraints: &other.constraints,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
/// Creates an in memory table.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct CreateMemoryTable {
/// The table name
pub name: TableReference,
@@ -250,7 +301,7 @@ pub struct CreateMemoryTable {
}
/// Creates a view.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
pub struct CreateView {
/// The table name
pub name: TableReference,
@@ -273,6 +324,16 @@ pub struct CreateCatalog {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for CreateCatalog {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.catalog_name.partial_cmp(&other.catalog_name) {
+ Some(Ordering::Equal) =>
self.if_not_exists.partial_cmp(&other.if_not_exists),
+ cmp => cmp,
+ }
+ }
+}
+
/// Creates a schema.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateCatalogSchema {
@@ -284,6 +345,16 @@ pub struct CreateCatalogSchema {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for CreateCatalogSchema {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.schema_name.partial_cmp(&other.schema_name) {
+ Some(Ordering::Equal) =>
self.if_not_exists.partial_cmp(&other.if_not_exists),
+ cmp => cmp,
+ }
+ }
+}
+
/// Drops a table.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DropTable {
@@ -295,6 +366,16 @@ pub struct DropTable {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for DropTable {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.name.partial_cmp(&other.name) {
+ Some(Ordering::Equal) =>
self.if_exists.partial_cmp(&other.if_exists),
+ cmp => cmp,
+ }
+ }
+}
+
/// Drops a view.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DropView {
@@ -306,6 +387,16 @@ pub struct DropView {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for DropView {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.name.partial_cmp(&other.name) {
+ Some(Ordering::Equal) =>
self.if_exists.partial_cmp(&other.if_exists),
+ cmp => cmp,
+ }
+ }
+}
+
/// Drops a schema
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DropCatalogSchema {
@@ -319,6 +410,19 @@ pub struct DropCatalogSchema {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for DropCatalogSchema {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.name.partial_cmp(&other.name) {
+ Some(Ordering::Equal) => match
self.if_exists.partial_cmp(&other.if_exists) {
+ Some(Ordering::Equal) =>
self.cascade.partial_cmp(&other.cascade),
+ cmp => cmp,
+ },
+ cmp => cmp,
+ }
+ }
+}
+
/// Arguments passed to `CREATE FUNCTION`
///
/// Note this meant to be the same as from sqlparser's
[`sqlparser::ast::Statement::CreateFunction`]
@@ -336,7 +440,40 @@ pub struct CreateFunction {
/// Dummy schema
pub schema: DFSchemaRef,
}
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for CreateFunction {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableCreateFunction<'a> {
+ pub or_replace: &'a bool,
+ pub temporary: &'a bool,
+ pub name: &'a String,
+ pub args: &'a Option<Vec<OperateFunctionArg>>,
+ pub return_type: &'a Option<DataType>,
+ pub params: &'a CreateFunctionBody,
+ }
+ let comparable_self = ComparableCreateFunction {
+ or_replace: &self.or_replace,
+ temporary: &self.temporary,
+ name: &self.name,
+ args: &self.args,
+ return_type: &self.return_type,
+ params: &self.params,
+ };
+ let comparable_other = ComparableCreateFunction {
+ or_replace: &other.or_replace,
+ temporary: &other.temporary,
+ name: &other.name,
+ args: &other.args,
+ return_type: &other.return_type,
+ params: &other.params,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct OperateFunctionArg {
// TODO: figure out how to support mode
// pub mode: Option<ArgMode>,
@@ -344,7 +481,7 @@ pub struct OperateFunctionArg {
pub data_type: DataType,
pub default_expr: Option<Expr>,
}
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct CreateFunctionBody {
/// LANGUAGE lang_name
pub language: Option<Ident>,
@@ -361,6 +498,15 @@ pub struct DropFunction {
pub schema: DFSchemaRef,
}
+impl PartialOrd for DropFunction {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.name.partial_cmp(&other.name) {
+ Some(Ordering::Equal) =>
self.if_exists.partial_cmp(&other.if_exists),
+ cmp => cmp,
+ }
+ }
+}
+
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct CreateIndex {
pub name: Option<String>,
@@ -371,3 +517,66 @@ pub struct CreateIndex {
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
+
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for CreateIndex {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableCreateIndex<'a> {
+ pub name: &'a Option<String>,
+ pub table: &'a TableReference,
+ pub using: &'a Option<String>,
+ pub columns: &'a Vec<SortExpr>,
+ pub unique: &'a bool,
+ pub if_not_exists: &'a bool,
+ }
+ let comparable_self = ComparableCreateIndex {
+ name: &self.name,
+ table: &self.table,
+ using: &self.using,
+ columns: &self.columns,
+ unique: &self.unique,
+ if_not_exists: &self.if_not_exists,
+ };
+ let comparable_other = ComparableCreateIndex {
+ name: &other.name,
+ table: &other.table,
+ using: &other.using,
+ columns: &other.columns,
+ unique: &other.unique,
+ if_not_exists: &other.if_not_exists,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::{CreateCatalog, DdlStatement, DropView};
+ use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
+ use std::cmp::Ordering;
+
+ #[test]
+ fn test_partial_ord() {
+ let catalog = DdlStatement::CreateCatalog(CreateCatalog {
+ catalog_name: "name".to_string(),
+ if_not_exists: false,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ });
+ let catalog_2 = DdlStatement::CreateCatalog(CreateCatalog {
+ catalog_name: "name".to_string(),
+ if_not_exists: true,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ });
+
+ assert_eq!(catalog.partial_cmp(&catalog_2), Some(Ordering::Less));
+
+ let drop_view = DdlStatement::DropView(DropView {
+ name: TableReference::from("table"),
+ if_exists: false,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ });
+
+ assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater));
+ }
+}
diff --git a/datafusion/expr/src/logical_plan/dml.rs
b/datafusion/expr/src/logical_plan/dml.rs
index 025bb7b289..c2ed9dc078 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
@@ -63,6 +64,23 @@ impl PartialEq for CopyTo {
// Implement Eq (no need for additional logic over PartialEq)
impl Eq for CopyTo {}
+// Manual implementation needed because of `file_type` and `options` fields.
+// Comparison excludes these field.
+impl PartialOrd for CopyTo {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.input.partial_cmp(&other.input) {
+ Some(Ordering::Equal) => match
self.output_url.partial_cmp(&other.output_url)
+ {
+ Some(Ordering::Equal) => {
+ self.partition_by.partial_cmp(&other.partition_by)
+ }
+ cmp => cmp,
+ },
+ cmp => cmp,
+ }
+ }
+}
+
// Implement Hash manually
impl Hash for CopyTo {
fn hash<H: Hasher>(&self, state: &mut H) {
@@ -112,7 +130,21 @@ impl DmlStatement {
}
}
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+// Manual implementation needed because of `table_schema` and `output_schema`
fields.
+// Comparison excludes these fields.
+impl PartialOrd for DmlStatement {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.table_name.partial_cmp(&other.table_name) {
+ Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
+ Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
+ cmp => cmp,
+ },
+ cmp => cmp,
+ }
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum WriteOp {
InsertOverwrite,
InsertInto,
diff --git a/datafusion/expr/src/logical_plan/extension.rs
b/datafusion/expr/src/logical_plan/extension.rs
index 5514ec2925..d49c85fb6f 100644
--- a/datafusion/expr/src/logical_plan/extension.rs
+++ b/datafusion/expr/src/logical_plan/extension.rs
@@ -18,6 +18,7 @@
//! This module defines the interface for logical nodes
use crate::{Expr, LogicalPlan};
use datafusion_common::{DFSchema, DFSchemaRef, Result};
+use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::{any::Any, collections::HashSet, fmt, sync::Arc};
@@ -193,6 +194,7 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync {
/// Note: [`UserDefinedLogicalNode`] is not constrained by [`Eq`]
/// directly because it must remain object safe.
fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool;
+ fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option<Ordering>;
}
impl Hash for dyn UserDefinedLogicalNode {
@@ -201,12 +203,18 @@ impl Hash for dyn UserDefinedLogicalNode {
}
}
-impl std::cmp::PartialEq for dyn UserDefinedLogicalNode {
+impl PartialEq for dyn UserDefinedLogicalNode {
fn eq(&self, other: &Self) -> bool {
self.dyn_eq(other)
}
}
+impl PartialOrd for dyn UserDefinedLogicalNode {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.dyn_ord(other)
+ }
+}
+
impl Eq for dyn UserDefinedLogicalNode {}
/// This trait facilitates implementation of the [`UserDefinedLogicalNode`].
@@ -215,7 +223,7 @@ impl Eq for dyn UserDefinedLogicalNode {}
///
[user_defined_plan.rs](https://github.com/apache/datafusion/blob/main/datafusion/core/tests/user_defined/user_defined_plan.rs)
/// file for an example of how to use this extension API.
pub trait UserDefinedLogicalNodeCore:
- fmt::Debug + Eq + Hash + Sized + Send + Sync + 'static
+ fmt::Debug + Eq + PartialOrd + Hash + Sized + Send + Sync + 'static
{
/// Return the plan's name.
fn name(&self) -> &str;
@@ -346,6 +354,13 @@ impl<T: UserDefinedLogicalNodeCore> UserDefinedLogicalNode
for T {
None => false,
}
}
+
+ fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option<Ordering> {
+ other
+ .as_any()
+ .downcast_ref::<Self>()
+ .and_then(|other| self.partial_cmp(other))
+ }
}
fn get_all_columns_from_schema(schema: &DFSchema) -> HashSet<String> {
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index b3f9b26fa4..6a88382061 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -17,6 +17,7 @@
//! Logical plan types
+use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
@@ -190,7 +191,7 @@ pub use datafusion_common::{JoinConstraint, JoinType};
/// # }
/// ```
///
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum LogicalPlan {
/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
@@ -2012,6 +2013,13 @@ pub struct EmptyRelation {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for EmptyRelation {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.produce_one_row.partial_cmp(&other.produce_one_row)
+ }
+}
+
/// A variadic query operation, Recursive CTE.
///
/// # Recursive Query Evaluation
@@ -2034,7 +2042,7 @@ pub struct EmptyRelation {
/// intermediate table, then empty the intermediate table.
///
/// [Postgres Docs]:
https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct RecursiveQuery {
/// Name of the query
pub name: String,
@@ -2059,6 +2067,13 @@ pub struct Values {
pub values: Vec<Vec<Expr>>,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Values {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.values.partial_cmp(&other.values)
+ }
+}
+
/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
@@ -2073,6 +2088,16 @@ pub struct Projection {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Projection {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.expr.partial_cmp(&other.expr) {
+ Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
+ cmp => cmp,
+ }
+ }
+}
+
impl Projection {
/// Create a new Projection
pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
@@ -2172,6 +2197,16 @@ impl SubqueryAlias {
}
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for SubqueryAlias {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.input.partial_cmp(&other.input) {
+ Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
+ cmp => cmp,
+ }
+ }
+}
+
/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
/// expression).
@@ -2183,7 +2218,7 @@ impl SubqueryAlias {
///
/// Filter should not be created directly but instead use `try_new()`
/// and that these fields are only pub to support pattern matching
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
#[non_exhaustive]
pub struct Filter {
/// The predicate expression, which must have Boolean type.
@@ -2409,6 +2444,16 @@ impl Window {
}
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Window {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.input.partial_cmp(&other.input) {
+ Some(Ordering::Equal) =>
self.window_expr.partial_cmp(&other.window_expr),
+ cmp => cmp,
+ }
+ }
+}
+
/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScan {
@@ -2451,6 +2496,37 @@ impl PartialEq for TableScan {
impl Eq for TableScan {}
+// Manual implementation needed because of `source` and `projected_schema`
fields.
+// Comparison excludes these field.
+impl PartialOrd for TableScan {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableTableScan<'a> {
+ /// The name of the table
+ pub table_name: &'a TableReference,
+ /// Optional column indices to use as a projection
+ pub projection: &'a Option<Vec<usize>>,
+ /// Optional expressions to be used as filters by the table
provider
+ pub filters: &'a Vec<Expr>,
+ /// Optional number of rows to read
+ pub fetch: &'a Option<usize>,
+ }
+ let comparable_self = ComparableTableScan {
+ table_name: &self.table_name,
+ projection: &self.projection,
+ filters: &self.filters,
+ fetch: &self.fetch,
+ };
+ let comparable_other = ComparableTableScan {
+ table_name: &other.table_name,
+ projection: &other.projection,
+ filters: &other.filters,
+ fetch: &other.fetch,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
impl Hash for TableScan {
fn hash<H: Hasher>(&self, state: &mut H) {
self.table_name.hash(state);
@@ -2526,8 +2602,18 @@ pub struct CrossJoin {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for CrossJoin {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.left.partial_cmp(&other.left) {
+ Some(Ordering::Equal) => self.right.partial_cmp(&other.right),
+ cmp => cmp,
+ }
+ }
+}
+
/// Repartition the plan based on a partitioning scheme.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Repartition {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
@@ -2544,9 +2630,16 @@ pub struct Union {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Union {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.inputs.partial_cmp(&other.inputs)
+ }
+}
+
/// Prepare a statement but do not execute it. Prepare statements can have 0
or more
/// `Expr::Placeholder` expressions that are filled in during execution
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Prepare {
/// The name of the statement
pub name: String,
@@ -2586,6 +2679,15 @@ pub struct DescribeTable {
pub output_schema: DFSchemaRef,
}
+// Manual implementation of `PartialOrd`, returning none since there are no
comparable types in
+// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
+impl PartialOrd for DescribeTable {
+ fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
+ // There is no relevant comparison for schemas
+ None
+ }
+}
+
/// Produces a relation with string representations of
/// various parts of the plan
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -2602,6 +2704,36 @@ pub struct Explain {
pub logical_optimization_succeeded: bool,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Explain {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableExplain<'a> {
+ /// Should extra (detailed, intermediate plans) be included?
+ pub verbose: &'a bool,
+ /// The logical plan that is being EXPLAIN'd
+ pub plan: &'a Arc<LogicalPlan>,
+ /// Represent the various stages plans have gone through
+ pub stringified_plans: &'a Vec<StringifiedPlan>,
+ /// Used by physical planner to check if should proceed with
planning
+ pub logical_optimization_succeeded: &'a bool,
+ }
+ let comparable_self = ComparableExplain {
+ verbose: &self.verbose,
+ plan: &self.plan,
+ stringified_plans: &self.stringified_plans,
+ logical_optimization_succeeded:
&self.logical_optimization_succeeded,
+ };
+ let comparable_other = ComparableExplain {
+ verbose: &other.verbose,
+ plan: &other.plan,
+ stringified_plans: &other.stringified_plans,
+ logical_optimization_succeeded:
&other.logical_optimization_succeeded,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -2614,6 +2746,16 @@ pub struct Analyze {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Analyze {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.verbose.partial_cmp(&other.verbose) {
+ Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
+ cmp => cmp,
+ }
+ }
+}
+
/// Extension operator defined outside of DataFusion
// TODO(clippy): This clippy `allow` should be removed if
// the manual `PartialEq` is removed in favor of a derive.
@@ -2634,8 +2776,14 @@ impl PartialEq for Extension {
}
}
+impl PartialOrd for Extension {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.node.partial_cmp(&other.node)
+ }
+}
+
/// Produces the first `n` tuples from its input and discards the rest.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Limit {
/// Number of rows to skip before fetch
pub skip: usize,
@@ -2647,7 +2795,7 @@ pub struct Limit {
}
/// Removes duplicate rows from the input
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum Distinct {
/// Plain `DISTINCT` referencing all selection expressions
All(Arc<LogicalPlan>),
@@ -2745,6 +2893,38 @@ impl DistinctOn {
}
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for DistinctOn {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableDistinctOn<'a> {
+ /// The `DISTINCT ON` clause expression list
+ pub on_expr: &'a Vec<Expr>,
+ /// The selected projection expression list
+ pub select_expr: &'a Vec<Expr>,
+ /// The `ORDER BY` clause, whose initial expressions must match
those of the `ON` clause when
+ /// present. Note that those matching expressions actually wrap
the `ON` expressions with
+ /// additional info pertaining to the sorting procedure (i.e.
ASC/DESC, and NULLS FIRST/LAST).
+ pub sort_expr: &'a Option<Vec<SortExpr>>,
+ /// The logical plan that is being DISTINCT'd
+ pub input: &'a Arc<LogicalPlan>,
+ }
+ let comparable_self = ComparableDistinctOn {
+ on_expr: &self.on_expr,
+ select_expr: &self.select_expr,
+ sort_expr: &self.sort_expr,
+ input: &self.input,
+ };
+ let comparable_other = ComparableDistinctOn {
+ on_expr: &other.on_expr,
+ select_expr: &other.select_expr,
+ sort_expr: &other.sort_expr,
+ input: &other.input,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -2849,6 +3029,21 @@ impl Aggregate {
}
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Aggregate {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.input.partial_cmp(&other.input) {
+ Some(Ordering::Equal) => {
+ match self.group_expr.partial_cmp(&other.group_expr) {
+ Some(Ordering::Equal) =>
self.aggr_expr.partial_cmp(&other.aggr_expr),
+ cmp => cmp,
+ }
+ }
+ cmp => cmp,
+ }
+ }
+}
+
/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
fn contains_grouping_set(group_expr: &[Expr]) -> bool {
group_expr
@@ -2947,7 +3142,7 @@ fn calc_func_dependencies_for_project(
}
/// Sorts its input according to a list of sort expressions.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Sort {
/// The sort expressions
pub expr: Vec<SortExpr>,
@@ -3013,8 +3208,50 @@ impl Join {
}
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Join {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableJoin<'a> {
+ /// Left input
+ pub left: &'a Arc<LogicalPlan>,
+ /// Right input
+ pub right: &'a Arc<LogicalPlan>,
+ /// Equijoin clause expressed as pairs of (left, right) join
expressions
+ pub on: &'a Vec<(Expr, Expr)>,
+ /// Filters applied during join (non-equi conditions)
+ pub filter: &'a Option<Expr>,
+ /// Join type
+ pub join_type: &'a JoinType,
+ /// Join constraint
+ pub join_constraint: &'a JoinConstraint,
+ /// If null_equals_null is true, null == null else null != null
+ pub null_equals_null: &'a bool,
+ }
+ let comparable_self = ComparableJoin {
+ left: &self.left,
+ right: &self.right,
+ on: &self.on,
+ filter: &self.filter,
+ join_type: &self.join_type,
+ join_constraint: &self.join_constraint,
+ null_equals_null: &self.null_equals_null,
+ };
+ let comparable_other = ComparableJoin {
+ left: &other.left,
+ right: &other.right,
+ on: &other.on,
+ filter: &other.filter,
+ join_type: &other.join_type,
+ join_constraint: &other.join_constraint,
+ null_equals_null: &other.null_equals_null,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
/// Subquery
-#[derive(Clone, PartialEq, Eq, Hash)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Subquery {
/// The subquery
pub subquery: Arc<LogicalPlan>,
@@ -3050,7 +3287,7 @@ impl Debug for Subquery {
/// See [`Partitioning`] for more details on partitioning
///
/// [`Partitioning`]:
https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified
number of partitions
RoundRobinBatch(usize),
@@ -3084,6 +3321,47 @@ pub struct Unnest {
pub options: UnnestOptions,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for Unnest {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ #[derive(PartialEq, PartialOrd)]
+ struct ComparableUnnest<'a> {
+ /// The incoming logical plan
+ pub input: &'a Arc<LogicalPlan>,
+ /// Columns to run unnest on, can be a list of (List/Struct)
columns
+ pub exec_columns: &'a Vec<Column>,
+ /// refer to the indices(in the input schema) of columns
+ /// that have type list to run unnest on
+ pub list_type_columns: &'a Vec<usize>,
+ /// refer to the indices (in the input schema) of columns
+ /// that have type struct to run unnest on
+ pub struct_type_columns: &'a Vec<usize>,
+ /// Having items aligned with the output columns
+ /// representing which column in the input schema each output
column depends on
+ pub dependency_indices: &'a Vec<usize>,
+ /// Options
+ pub options: &'a UnnestOptions,
+ }
+ let comparable_self = ComparableUnnest {
+ input: &self.input,
+ exec_columns: &self.exec_columns,
+ list_type_columns: &self.list_type_columns,
+ struct_type_columns: &self.struct_type_columns,
+ dependency_indices: &self.dependency_indices,
+ options: &self.options,
+ };
+ let comparable_other = ComparableUnnest {
+ input: &other.input,
+ exec_columns: &other.exec_columns,
+ list_type_columns: &other.list_type_columns,
+ struct_type_columns: &other.struct_type_columns,
+ dependency_indices: &other.dependency_indices,
+ options: &other.options,
+ };
+ comparable_self.partial_cmp(&comparable_other)
+ }
+}
+
#[cfg(test)]
mod tests {
@@ -3696,4 +3974,40 @@ digraph {
let actual = format!("{}", plan.display_indent());
assert_eq!(expected.to_string(), actual)
}
+
+ #[test]
+ fn test_plan_partial_ord() {
+ let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: Arc::new(DFSchema::empty()),
+ });
+
+ let describe_table = LogicalPlan::DescribeTable(DescribeTable {
+ schema: Arc::new(Schema::new(vec![Field::new(
+ "foo",
+ DataType::Int32,
+ false,
+ )])),
+ output_schema: DFSchemaRef::new(DFSchema::empty()),
+ });
+
+ let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
+ schema: Arc::new(Schema::new(vec![Field::new(
+ "foo",
+ DataType::Int32,
+ false,
+ )])),
+ output_schema: DFSchemaRef::new(DFSchema::empty()),
+ });
+
+ assert_eq!(
+ empty_relation.partial_cmp(&describe_table),
+ Some(Ordering::Less)
+ );
+ assert_eq!(
+ describe_table.partial_cmp(&empty_relation),
+ Some(Ordering::Greater)
+ );
+ assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
+ }
}
diff --git a/datafusion/expr/src/logical_plan/statement.rs
b/datafusion/expr/src/logical_plan/statement.rs
index 21ff8dbd8e..ed06375157 100644
--- a/datafusion/expr/src/logical_plan/statement.rs
+++ b/datafusion/expr/src/logical_plan/statement.rs
@@ -16,6 +16,7 @@
// under the License.
use datafusion_common::DFSchemaRef;
+use std::cmp::Ordering;
use std::fmt::{self, Display};
/// Various types of Statements.
@@ -25,7 +26,7 @@ use std::fmt::{self, Display};
/// While DataFusion does not offer support transactions, it provides
/// [`LogicalPlan`](crate::LogicalPlan) support to assist building
/// database systems using DataFusion
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum Statement {
// Begin a transaction
TransactionStart(TransactionStart),
@@ -92,21 +93,21 @@ impl Statement {
}
/// Indicates if a transaction was committed or aborted
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum TransactionConclusion {
Commit,
Rollback,
}
/// Indicates if this transaction is allowed to write
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum TransactionAccessMode {
ReadOnly,
ReadWrite,
}
/// Indicates ANSI transaction isolation level
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum TransactionIsolationLevel {
ReadUncommitted,
ReadCommitted,
@@ -125,6 +126,18 @@ pub struct TransactionStart {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for TransactionStart {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.access_mode.partial_cmp(&other.access_mode) {
+ Some(Ordering::Equal) => {
+ self.isolation_level.partial_cmp(&other.isolation_level)
+ }
+ cmp => cmp,
+ }
+ }
+}
+
/// Indicator that any current transaction should be terminated
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TransactionEnd {
@@ -136,6 +149,16 @@ pub struct TransactionEnd {
pub schema: DFSchemaRef,
}
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for TransactionEnd {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.conclusion.partial_cmp(&other.conclusion) {
+ Some(Ordering::Equal) => self.chain.partial_cmp(&other.chain),
+ cmp => cmp,
+ }
+ }
+}
+
/// Set a Variable's value -- value in
/// [`ConfigOptions`](datafusion_common::config::ConfigOptions)
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -147,3 +170,13 @@ pub struct SetVariable {
/// Dummy schema
pub schema: DFSchemaRef,
}
+
+// Manual implementation needed because of `schema` field. Comparison excludes
this field.
+impl PartialOrd for SetVariable {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.variable.partial_cmp(&other.value) {
+ Some(Ordering::Equal) => self.value.partial_cmp(&other.value),
+ cmp => cmp,
+ }
+ }
+}
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index 598607ae5e..938e1181d8 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -25,6 +25,7 @@ use arrow::datatypes::DataType;
use datafusion_common::{not_impl_err, ExprSchema, Result};
use datafusion_expr_common::interval_arithmetic::Interval;
use std::any::Any;
+use std::cmp::Ordering;
use std::fmt::Debug;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;
@@ -62,6 +63,16 @@ impl PartialEq for ScalarUDF {
}
}
+// Manual implementation based on `ScalarUDFImpl::equals`
+impl PartialOrd for ScalarUDF {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.name().partial_cmp(other.name()) {
+ Some(Ordering::Equal) =>
self.signature().partial_cmp(other.signature()),
+ cmp => cmp,
+ }
+ }
+}
+
impl Eq for ScalarUDF {}
impl Hash for ScalarUDF {
diff --git a/datafusion/expr/src/window_frame.rs
b/datafusion/expr/src/window_frame.rs
index 6c935cdcd1..b2e8268aa3 100644
--- a/datafusion/expr/src/window_frame.rs
+++ b/datafusion/expr/src/window_frame.rs
@@ -36,7 +36,7 @@ use sqlparser::parser::ParserError::ParserError;
/// window function. The ending frame boundary can be omitted if the `BETWEEN`
/// and `AND` keywords that surround the starting frame boundary are also
omitted,
/// in which case the ending frame boundary defaults to `CURRENT ROW`.
-#[derive(Clone, PartialEq, Eq, Hash)]
+#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct WindowFrame {
/// Frame type - either `ROWS`, `RANGE` or `GROUPS`
pub units: WindowFrameUnits,
@@ -300,7 +300,7 @@ impl WindowFrame {
/// 4. `<expr>` FOLLOWING
/// 5. UNBOUNDED FOLLOWING
///
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum WindowFrameBound {
/// 1. UNBOUNDED PRECEDING
/// The frame boundary is the first row in the partition.
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs
b/datafusion/optimizer/src/analyzer/subquery.rs
index 996dc23885..c771f31a58 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/optimizer/src/analyzer/subquery.rs
@@ -336,6 +336,7 @@ fn check_mixed_out_refer_in_window(window: &Window) ->
Result<()> {
#[cfg(test)]
mod test {
+ use std::cmp::Ordering;
use std::sync::Arc;
use datafusion_common::{DFSchema, DFSchemaRef};
@@ -348,6 +349,12 @@ mod test {
empty_schema: DFSchemaRef,
}
+ impl PartialOrd for MockUserDefinedLogicalPlan {
+ fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
+ None
+ }
+ }
+
impl UserDefinedLogicalNodeCore for MockUserDefinedLogicalPlan {
fn name(&self) -> &str {
"MockUserDefinedLogicalPlan"
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 65db164c6e..8c61d45cc8 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -785,6 +785,7 @@ fn is_projection_unnecessary(input: &LogicalPlan,
proj_exprs: &[Expr]) -> Result
#[cfg(test)]
mod tests {
+ use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::Formatter;
use std::ops::Add;
@@ -844,6 +845,16 @@ mod tests {
}
}
+ // Manual implementation needed because of `schema` field. Comparison
excludes this field.
+ impl PartialOrd for NoOpUserDefined {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.exprs.partial_cmp(&other.exprs) {
+ Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
+ cmp => cmp,
+ }
+ }
+ }
+
impl UserDefinedLogicalNodeCore for NoOpUserDefined {
fn name(&self) -> &str {
"NoOpUserDefined"
@@ -910,6 +921,23 @@ mod tests {
}
}
+ // Manual implementation needed because of `schema` field. Comparison
excludes this field.
+ impl PartialOrd for UserDefinedCrossJoin {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.exprs.partial_cmp(&other.exprs) {
+ Some(Ordering::Equal) => {
+ match self.left_child.partial_cmp(&other.left_child) {
+ Some(Ordering::Equal) => {
+ self.right_child.partial_cmp(&other.right_child)
+ }
+ cmp => cmp,
+ }
+ }
+ cmp => cmp,
+ }
+ }
+ }
+
impl UserDefinedLogicalNodeCore for UserDefinedCrossJoin {
fn name(&self) -> &str {
"UserDefinedCrossJoin"
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 6f0a64b85c..a306ff7d2d 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -1195,6 +1195,7 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>)
-> bool {
#[cfg(test)]
mod tests {
use std::any::Any;
+ use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -1451,6 +1452,13 @@ mod tests {
schema: DFSchemaRef,
}
+ // Manual implementation needed because of `schema` field. Comparison
excludes this field.
+ impl PartialOrd for NoopPlan {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.input.partial_cmp(&other.input)
+ }
+ }
+
impl UserDefinedLogicalNodeCore for NoopPlan {
fn name(&self) -> &str {
"NoopPlan"
diff --git a/datafusion/optimizer/src/test/user_defined.rs
b/datafusion/optimizer/src/test/user_defined.rs
index d040fa2bae..814cd0c0cd 100644
--- a/datafusion/optimizer/src/test/user_defined.rs
+++ b/datafusion/optimizer/src/test/user_defined.rs
@@ -33,7 +33,7 @@ pub fn new(input: LogicalPlan) -> LogicalPlan {
LogicalPlan::Extension(Extension { node })
}
-#[derive(PartialEq, Eq, Hash)]
+#[derive(PartialEq, Eq, PartialOrd, Hash)]
struct TestUserDefinedPlanNode {
input: LogicalPlan,
}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 71c8dbe6ec..133c38ab8c 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -978,7 +978,7 @@ pub mod proto {
}
}
-#[derive(PartialEq, Eq, Hash)]
+#[derive(PartialEq, Eq, PartialOrd, Hash)]
struct TopKPlanNode {
k: usize,
input: LogicalPlan,
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index cc353ab36d..ea85092f7a 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -21,6 +21,7 @@ use datafusion::scalar::ScalarValue;
use datafusion_substrait::logical_plan::{
consumer::from_substrait_plan, producer::to_substrait_plan,
};
+use std::cmp::Ordering;
use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema,
TimeUnit};
use datafusion::common::{not_impl_err, plan_err, DFSchema, DFSchemaRef};
@@ -84,6 +85,17 @@ struct MockUserDefinedLogicalPlan {
empty_schema: DFSchemaRef,
}
+// `PartialOrd` needed for `UserDefinedLogicalNodeCore`, manual implementation
necessary due to
+// the `empty_schema` field.
+impl PartialOrd for MockUserDefinedLogicalPlan {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.validation_bytes.partial_cmp(&other.validation_bytes) {
+ Some(Ordering::Equal) => self.inputs.partial_cmp(&other.inputs),
+ cmp => cmp,
+ }
+ }
+}
+
impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan {
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -132,6 +144,10 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan
{
fn dyn_eq(&self, _: &dyn UserDefinedLogicalNode) -> bool {
unimplemented!()
}
+
+ fn dyn_ord(&self, _: &dyn UserDefinedLogicalNode) -> Option<Ordering> {
+ unimplemented!()
+ }
}
impl MockUserDefinedLogicalPlan {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]