This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b64d2c6  Extract CrossJoin, Repartition, Union in LogicalPlan (#1322)
b64d2c6 is described below

commit b64d2c62669a89c2d7a913c5f40598c07ab3bece
Author: Kun Liu <[email protected]>
AuthorDate: Fri Nov 19 02:10:56 2021 +0800

    Extract CrossJoin, Repartition, Union in LogicalPlan (#1322)
---
 ballista/rust/core/src/serde/logical_plan/mod.rs   |  13 +--
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  12 +--
 datafusion/src/logical_plan/builder.rs             |  20 ++---
 datafusion/src/logical_plan/mod.rs                 |   5 +-
 datafusion/src/logical_plan/plan.rs                | 100 ++++++++++++---------
 .../src/optimizer/common_subexpr_eliminate.rs      |   6 +-
 datafusion/src/optimizer/constant_folding.rs       |   6 +-
 datafusion/src/optimizer/filter_push_down.rs       |   8 +-
 datafusion/src/optimizer/limit_push_down.rs        |  10 +--
 datafusion/src/optimizer/projection_push_down.rs   |  14 +--
 datafusion/src/optimizer/utils.rs                  |  34 +++----
 datafusion/src/physical_plan/planner.rs            |  12 +--
 12 files changed, 131 insertions(+), 109 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 472399b..a5e2aa0 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -24,6 +24,7 @@ mod roundtrip_tests {
     use super::super::{super::error::Result, protobuf};
     use crate::error::BallistaError;
     use core::panic;
+    use datafusion::logical_plan::Repartition;
     use datafusion::{
         arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
         datasource::object_store::local::LocalFileSystem,
@@ -93,28 +94,28 @@ mod roundtrip_tests {
         for batch_size in test_batch_sizes.iter() {
             let rr_repartition = Partitioning::RoundRobinBatch(*batch_size);
 
-            let roundtrip_plan = LogicalPlan::Repartition {
+            let roundtrip_plan = LogicalPlan::Repartition(Repartition {
                 input: plan.clone(),
                 partitioning_scheme: rr_repartition,
-            };
+            });
 
             roundtrip_test!(roundtrip_plan);
 
             let h_repartition = Partitioning::Hash(test_expr.clone(), 
*batch_size);
 
-            let roundtrip_plan = LogicalPlan::Repartition {
+            let roundtrip_plan = LogicalPlan::Repartition(Repartition {
                 input: plan.clone(),
                 partitioning_scheme: h_repartition,
-            };
+            });
 
             roundtrip_test!(roundtrip_plan);
 
             let no_expr_hrepartition = Partitioning::Hash(Vec::new(), 
*batch_size);
 
-            let roundtrip_plan = LogicalPlan::Repartition {
+            let roundtrip_plan = LogicalPlan::Repartition(Repartition {
                 input: plan.clone(),
                 partitioning_scheme: no_expr_hrepartition,
-            };
+            });
 
             roundtrip_test!(roundtrip_plan);
         }
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 825a4b9..a870ecd 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -33,8 +33,8 @@ use datafusion::datasource::listing::ListingTable;
 use datafusion::logical_plan::{
     exprlist_to_fields,
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
-    Column, CreateExternalTable, Expr, JoinConstraint, JoinType, LogicalPlan,
-    TableScanPlan,
+    Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, 
LogicalPlan,
+    Repartition, TableScanPlan,
 };
 use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::functions::BuiltinScalarFunction;
@@ -901,10 +901,10 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     ))),
                 })
             }
-            LogicalPlan::Repartition {
+            LogicalPlan::Repartition(Repartition {
                 input,
                 partitioning_scheme,
-            } => {
+            }) => {
                 use datafusion::logical_plan::Partitioning;
                 let input: protobuf::LogicalPlanNode = 
input.as_ref().try_into()?;
 
@@ -996,8 +996,8 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                 })
             }
             LogicalPlan::Extension { .. } => unimplemented!(),
-            LogicalPlan::Union { .. } => unimplemented!(),
-            LogicalPlan::CrossJoin { left, right, .. } => {
+            LogicalPlan::Union(_) => unimplemented!(),
+            LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
                 let left: protobuf::LogicalPlanNode = 
left.as_ref().try_into()?;
                 let right: protobuf::LogicalPlanNode = 
right.as_ref().try_into()?;
                 Ok(protobuf::LogicalPlanNode {
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index 64b6c25..8c30579 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -26,7 +26,7 @@ use crate::datasource::{
 };
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::plan::{
-    AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan,
+    AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union,
 };
 use crate::prelude::*;
 use crate::scalar::ScalarValue;
@@ -44,8 +44,8 @@ use std::{
 use super::dfschema::ToDFSchema;
 use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, 
PlanType};
 use crate::logical_plan::{
-    columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
-    DFSchemaRef, Partitioning,
+    columnize_expr, normalize_col, normalize_cols, Column, CrossJoin, DFField, 
DFSchema,
+    DFSchemaRef, Partitioning, Repartition,
 };
 use crate::sql::utils::group_window_expr_by_sort_keys;
 
@@ -632,19 +632,19 @@ impl LogicalPlanBuilder {
     /// Apply a cross join
     pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
         let schema = self.plan.schema().join(right.schema())?;
-        Ok(Self::from(LogicalPlan::CrossJoin {
+        Ok(Self::from(LogicalPlan::CrossJoin(CrossJoin {
             left: Arc::new(self.plan.clone()),
             right: Arc::new(right.clone()),
             schema: DFSchemaRef::new(schema),
-        }))
+        })))
     }
 
     /// Repartition
     pub fn repartition(&self, partitioning_scheme: Partitioning) -> 
Result<Self> {
-        Ok(Self::from(LogicalPlan::Repartition {
+        Ok(Self::from(LogicalPlan::Repartition(Repartition {
             input: Arc::new(self.plan.clone()),
             partitioning_scheme,
-        }))
+        })))
     }
 
     /// Apply a window functions to extend the schema
@@ -840,7 +840,7 @@ pub fn union_with_alias(
     let inputs = vec![left_plan, right_plan]
         .into_iter()
         .flat_map(|p| match p {
-            LogicalPlan::Union { inputs, .. } => inputs,
+            LogicalPlan::Union(Union { inputs, .. }) => inputs,
             x => vec![x],
         })
         .collect::<Vec<_>>();
@@ -863,11 +863,11 @@ pub fn union_with_alias(
         ));
     }
 
-    Ok(LogicalPlan::Union {
+    Ok(LogicalPlan::Union(Union {
         schema: union_schema,
         inputs,
         alias,
-    })
+    }))
 }
 
 /// Project with optional alias
diff --git a/datafusion/src/logical_plan/mod.rs 
b/datafusion/src/logical_plan/mod.rs
index 77193eb..bb89542 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -51,8 +51,9 @@ pub use expr::{
 pub use extension::UserDefinedLogicalNode;
 pub use operators::Operator;
 pub use plan::{
-    CreateExternalTable, CreateMemoryTable, DropTable, JoinConstraint, 
JoinType,
-    LogicalPlan, Partitioning, PlanType, PlanVisitor, TableScanPlan,
+    CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, 
JoinConstraint,
+    JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
+    TableScanPlan, Union,
 };
 pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
 pub use registry::FunctionRegistry;
diff --git a/datafusion/src/logical_plan/plan.rs 
b/datafusion/src/logical_plan/plan.rs
index ea56bc0..c12edd9 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -74,6 +74,37 @@ pub struct TableScanPlan {
     pub limit: Option<usize>,
 }
 
+/// Apply Cross Join to two logical plans
+#[derive(Clone)]
+pub struct CrossJoin {
+    /// Left input
+    pub left: Arc<LogicalPlan>,
+    /// Right input
+    pub right: Arc<LogicalPlan>,
+    /// The output schema, containing fields from the left and right inputs
+    pub schema: DFSchemaRef,
+}
+
+/// Repartition the plan based on a partitioning scheme.
+#[derive(Clone)]
+pub struct Repartition {
+    /// The incoming logical plan
+    pub input: Arc<LogicalPlan>,
+    /// The partitioning scheme
+    pub partitioning_scheme: Partitioning,
+}
+
+/// Union multiple inputs
+#[derive(Clone)]
+pub struct Union {
+    /// Inputs to merge
+    pub inputs: Vec<LogicalPlan>,
+    /// Union schema. Should be the same for all inputs.
+    pub schema: DFSchemaRef,
+    /// Union output relation alias
+    pub alias: Option<String>,
+}
+
 /// Creates an in memory table.
 #[derive(Clone)]
 pub struct CreateMemoryTable {
@@ -224,30 +255,11 @@ pub enum LogicalPlan {
         null_equals_null: bool,
     },
     /// Apply Cross Join to two logical plans
-    CrossJoin {
-        /// Left input
-        left: Arc<LogicalPlan>,
-        /// Right input
-        right: Arc<LogicalPlan>,
-        /// The output schema, containing fields from the left and right inputs
-        schema: DFSchemaRef,
-    },
+    CrossJoin(CrossJoin),
     /// Repartition the plan based on a partitioning scheme.
-    Repartition {
-        /// The incoming logical plan
-        input: Arc<LogicalPlan>,
-        /// The partitioning scheme
-        partitioning_scheme: Partitioning,
-    },
+    Repartition(Repartition),
     /// Union multiple inputs
-    Union {
-        /// Inputs to merge
-        inputs: Vec<LogicalPlan>,
-        /// Union schema. Should be the same for all inputs.
-        schema: DFSchemaRef,
-        /// Union output relation alias
-        alias: Option<String>,
-    },
+    Union(Union),
     /// Produces rows from a table provider by reference or from the context
     TableScan(TableScanPlan),
     /// Produces no rows: An empty relation with an empty schema
@@ -304,8 +316,8 @@ impl LogicalPlan {
             LogicalPlan::Aggregate { schema, .. } => schema,
             LogicalPlan::Sort { input, .. } => input.schema(),
             LogicalPlan::Join { schema, .. } => schema,
-            LogicalPlan::CrossJoin { schema, .. } => schema,
-            LogicalPlan::Repartition { input, .. } => input.schema(),
+            LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
+            LogicalPlan::Repartition(Repartition { input, .. }) => 
input.schema(),
             LogicalPlan::Limit { input, .. } => input.schema(),
             LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. 
}) => {
                 schema
@@ -313,7 +325,7 @@ impl LogicalPlan {
             LogicalPlan::Explain(explain) => &explain.schema,
             LogicalPlan::Analyze(analyze) => &analyze.schema,
             LogicalPlan::Extension(extension) => extension.node.schema(),
-            LogicalPlan::Union { schema, .. } => schema,
+            LogicalPlan::Union(Union { schema, .. }) => schema,
             LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => 
{
                 input.schema()
             }
@@ -341,17 +353,17 @@ impl LogicalPlan {
                 schema,
                 ..
             }
-            | LogicalPlan::CrossJoin {
+            | LogicalPlan::CrossJoin(CrossJoin {
                 left,
                 right,
                 schema,
-            } => {
+            }) => {
                 let mut schemas = left.all_schemas();
                 schemas.extend(right.all_schemas());
                 schemas.insert(0, schema);
                 schemas
             }
-            LogicalPlan::Union { schema, .. } => {
+            LogicalPlan::Union(Union { schema, .. }) => {
                 vec![schema]
             }
             LogicalPlan::Extension(extension) => vec![extension.node.schema()],
@@ -362,7 +374,7 @@ impl LogicalPlan {
                 vec![schema]
             }
             LogicalPlan::Limit { input, .. }
-            | LogicalPlan::Repartition { input, .. }
+            | LogicalPlan::Repartition(Repartition { input, .. })
             | LogicalPlan::Sort { input, .. }
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
             | LogicalPlan::Filter { input, .. } => input.all_schemas(),
@@ -388,10 +400,10 @@ impl LogicalPlan {
                 values.iter().flatten().cloned().collect()
             }
             LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
-            LogicalPlan::Repartition {
+            LogicalPlan::Repartition(Repartition {
                 partitioning_scheme,
                 ..
-            } => match partitioning_scheme {
+            }) => match partitioning_scheme {
                 Partitioning::Hash(expr, _) => expr.clone(),
                 _ => vec![],
             },
@@ -414,10 +426,10 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
             | LogicalPlan::DropTable(_)
-            | LogicalPlan::CrossJoin { .. }
+            | LogicalPlan::CrossJoin(_)
             | LogicalPlan::Analyze { .. }
             | LogicalPlan::Explain { .. }
-            | LogicalPlan::Union { .. } => {
+            | LogicalPlan::Union(_) => {
                 vec![]
             }
         }
@@ -429,15 +441,15 @@ impl LogicalPlan {
         match self {
             LogicalPlan::Projection { input, .. } => vec![input],
             LogicalPlan::Filter { input, .. } => vec![input],
-            LogicalPlan::Repartition { input, .. } => vec![input],
+            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
             LogicalPlan::Window { input, .. } => vec![input],
             LogicalPlan::Aggregate { input, .. } => vec![input],
             LogicalPlan::Sort { input, .. } => vec![input],
             LogicalPlan::Join { left, right, .. } => vec![left, right],
-            LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
+            LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => 
vec![left, right],
             LogicalPlan::Limit { input, .. } => vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
-            LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
+            LogicalPlan::Union(Union { inputs, .. }) => 
inputs.iter().collect(),
             LogicalPlan::Explain(explain) => vec![&explain.plan],
             LogicalPlan::Analyze(analyze) => vec![&analyze.input],
             LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => 
{
@@ -564,15 +576,17 @@ impl LogicalPlan {
         let recurse = match self {
             LogicalPlan::Projection { input, .. } => input.accept(visitor)?,
             LogicalPlan::Filter { input, .. } => input.accept(visitor)?,
-            LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
+            LogicalPlan::Repartition(Repartition { input, .. }) => {
+                input.accept(visitor)?
+            }
             LogicalPlan::Window { input, .. } => input.accept(visitor)?,
             LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
             LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
             LogicalPlan::Join { left, right, .. }
-            | LogicalPlan::CrossJoin { left, right, .. } => {
+            | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
                 left.accept(visitor)? && right.accept(visitor)?
             }
-            LogicalPlan::Union { inputs, .. } => {
+            LogicalPlan::Union(Union { inputs, .. }) => {
                 for input in inputs {
                     if !input.accept(visitor)? {
                         return Ok(false);
@@ -887,13 +901,13 @@ impl LogicalPlan {
                             }
                         }
                     }
-                    LogicalPlan::CrossJoin { .. } => {
+                    LogicalPlan::CrossJoin(_) => {
                         write!(f, "CrossJoin:")
                     }
-                    LogicalPlan::Repartition {
+                    LogicalPlan::Repartition(Repartition {
                         partitioning_scheme,
                         ..
-                    } => match partitioning_scheme {
+                    }) => match partitioning_scheme {
                         Partitioning::RoundRobinBatch(n) => write!(
                             f,
                             "Repartition: RoundRobinBatch partition_count={}",
@@ -927,7 +941,7 @@ impl LogicalPlan {
                     }
                     LogicalPlan::Explain { .. } => write!(f, "Explain"),
                     LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
-                    LogicalPlan::Union { .. } => write!(f, "Union"),
+                    LogicalPlan::Union(_) => write!(f, "Union"),
                     LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
                 }
             }
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs 
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 9e7a9dd..1ab87e7 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -195,9 +195,9 @@ fn optimize(plan: &LogicalPlan, execution_props: 
&ExecutionProps) -> Result<Logi
             })
         }
         LogicalPlan::Join { .. }
-        | LogicalPlan::CrossJoin { .. }
-        | LogicalPlan::Repartition { .. }
-        | LogicalPlan::Union { .. }
+        | LogicalPlan::CrossJoin(_)
+        | LogicalPlan::Repartition(_)
+        | LogicalPlan::Union(_)
         | LogicalPlan::TableScan { .. }
         | LogicalPlan::Values { .. }
         | LogicalPlan::EmptyRelation { .. }
diff --git a/datafusion/src/optimizer/constant_folding.rs 
b/datafusion/src/optimizer/constant_folding.rs
index 06bac6c..ff1e16c 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -68,7 +68,7 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Projection { .. }
             | LogicalPlan::Window { .. }
             | LogicalPlan::Aggregate { .. }
-            | LogicalPlan::Repartition { .. }
+            | LogicalPlan::Repartition(_)
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
             | LogicalPlan::DropTable(_)
@@ -78,9 +78,9 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Explain { .. }
             | LogicalPlan::Analyze { .. }
             | LogicalPlan::Limit { .. }
-            | LogicalPlan::Union { .. }
+            | LogicalPlan::Union(_)
             | LogicalPlan::Join { .. }
-            | LogicalPlan::CrossJoin { .. } => {
+            | LogicalPlan::CrossJoin(_) => {
                 // apply the optimization to all inputs of the plan
                 let inputs = plan.inputs();
                 let new_inputs = inputs
diff --git a/datafusion/src/optimizer/filter_push_down.rs 
b/datafusion/src/optimizer/filter_push_down.rs
index 1bb1a18..cc8f011 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,7 +16,9 @@
 
 use crate::datasource::datasource::TableProviderFilterPushDown;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::{and, replace_col, Column, LogicalPlan, 
TableScanPlan};
+use crate::logical_plan::{
+    and, replace_col, Column, CrossJoin, LogicalPlan, TableScanPlan,
+};
 use crate::logical_plan::{DFSchema, Expr};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
@@ -374,7 +376,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
             // sort is filter-commutable
             push_down(&state, plan)
         }
-        LogicalPlan::Union { .. } => {
+        LogicalPlan::Union(_) => {
             // union all is filter-commutable
             push_down(&state, plan)
         }
@@ -388,7 +390,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
                 .collect::<HashSet<_>>();
             issue_filters(state, used_columns, plan)
         }
-        LogicalPlan::CrossJoin { left, right, .. } => {
+        LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
             optimize_join(state, plan, left, right)
         }
         LogicalPlan::Join {
diff --git a/datafusion/src/optimizer/limit_push_down.rs 
b/datafusion/src/optimizer/limit_push_down.rs
index 842ca08..ce8b5bc 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,8 +20,8 @@
 use super::utils;
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::LogicalPlan;
 use crate::logical_plan::TableScanPlan;
+use crate::logical_plan::{LogicalPlan, Union};
 use crate::optimizer::optimizer::OptimizerRule;
 use std::sync::Arc;
 
@@ -99,11 +99,11 @@ fn limit_push_down(
             })
         }
         (
-            LogicalPlan::Union {
+            LogicalPlan::Union(Union {
                 inputs,
                 alias,
                 schema,
-            },
+            }),
             Some(upper_limit),
         ) => {
             // Push down limit through UNION
@@ -121,11 +121,11 @@ fn limit_push_down(
                     })
                 })
                 .collect::<Result<_>>()?;
-            Ok(LogicalPlan::Union {
+            Ok(LogicalPlan::Union(Union {
                 inputs: new_inputs,
                 alias: alias.clone(),
                 schema: schema.clone(),
-            })
+            }))
         }
         // For other nodes we can't push down the limit
         // But try to recurse and find other limit nodes to push down
diff --git a/datafusion/src/optimizer/projection_push_down.rs 
b/datafusion/src/optimizer/projection_push_down.rs
index f3a7c61..b725b1d 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -23,7 +23,7 @@ use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{AnalyzePlan, TableScanPlan};
 use crate::logical_plan::{
     build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
-    LogicalPlanBuilder, ToDFSchema,
+    LogicalPlanBuilder, ToDFSchema, Union,
 };
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
@@ -377,11 +377,11 @@ fn optimize_plan(
                 schema: a.schema.clone(),
             }))
         }
-        LogicalPlan::Union {
+        LogicalPlan::Union(Union {
             inputs,
             schema,
             alias,
-        } => {
+        }) => {
             // UNION inputs will reference the same column with different 
identifiers, so we need
             // to populate new_required_columns by unqualified column name 
based on required fields
             // from the resulting UNION output
@@ -419,24 +419,24 @@ fn optimize_plan(
                     .cloned()
                     .collect(),
             )?;
-            Ok(LogicalPlan::Union {
+            Ok(LogicalPlan::Union(Union {
                 inputs: new_inputs,
                 schema: Arc::new(new_schema),
                 alias: alias.clone(),
-            })
+            }))
         }
         // all other nodes: Add any additional columns used by
         // expressions in this node to the list of required columns
         LogicalPlan::Limit { .. }
         | LogicalPlan::Filter { .. }
-        | LogicalPlan::Repartition { .. }
+        | LogicalPlan::Repartition(_)
         | LogicalPlan::EmptyRelation { .. }
         | LogicalPlan::Values { .. }
         | LogicalPlan::Sort { .. }
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::CreateMemoryTable(_)
         | LogicalPlan::DropTable(_)
-        | LogicalPlan::CrossJoin { .. }
+        | LogicalPlan::CrossJoin(_)
         | LogicalPlan::Extension { .. } => {
             let expr = plan.expressions();
             // collect all required columns by this plan
diff --git a/datafusion/src/optimizer/utils.rs 
b/datafusion/src/optimizer/utils.rs
index fe14442..b9a3b99 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -27,7 +27,7 @@ use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan};
 use crate::logical_plan::{
     build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
     ExprRewriter, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, 
Recursion,
-    RewriteRecursion,
+    Repartition, RewriteRecursion, Union,
 };
 use crate::physical_plan::functions::Volatility;
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -162,18 +162,20 @@ pub fn from_plan(
             predicate: expr[0].clone(),
             input: Arc::new(inputs[0].clone()),
         }),
-        LogicalPlan::Repartition {
+        LogicalPlan::Repartition(Repartition {
             partitioning_scheme,
             ..
-        } => match partitioning_scheme {
-            Partitioning::RoundRobinBatch(n) => Ok(LogicalPlan::Repartition {
-                partitioning_scheme: Partitioning::RoundRobinBatch(*n),
-                input: Arc::new(inputs[0].clone()),
-            }),
-            Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition {
+        }) => match partitioning_scheme {
+            Partitioning::RoundRobinBatch(n) => {
+                Ok(LogicalPlan::Repartition(Repartition {
+                    partitioning_scheme: Partitioning::RoundRobinBatch(*n),
+                    input: Arc::new(inputs[0].clone()),
+                }))
+            }
+            Partitioning::Hash(_, n) => 
Ok(LogicalPlan::Repartition(Repartition {
                 partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
                 input: Arc::new(inputs[0].clone()),
-            }),
+            })),
         },
         LogicalPlan::Window {
             window_expr,
@@ -215,7 +217,7 @@ pub fn from_plan(
                 null_equals_null: *null_equals_null,
             })
         }
-        LogicalPlan::CrossJoin { .. } => {
+        LogicalPlan::CrossJoin(_) => {
             let left = inputs[0].clone();
             let right = &inputs[1];
             LogicalPlanBuilder::from(left).cross_join(right)?.build()
@@ -233,11 +235,13 @@ pub fn from_plan(
         LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(ExtensionPlan {
             node: e.node.from_template(expr, inputs),
         })),
-        LogicalPlan::Union { schema, alias, .. } => Ok(LogicalPlan::Union {
-            inputs: inputs.to_vec(),
-            schema: schema.clone(),
-            alias: alias.clone(),
-        }),
+        LogicalPlan::Union(Union { schema, alias, .. }) => {
+            Ok(LogicalPlan::Union(Union {
+                inputs: inputs.to_vec(),
+                schema: schema.clone(),
+                alias: alias.clone(),
+            }))
+        }
         LogicalPlan::Analyze(a) => {
             assert!(expr.is_empty());
             assert_eq!(inputs.len(), 1);
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index 25ed0e4..ad858be 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -25,8 +25,8 @@ use super::{
 use crate::execution::context::ExecutionContextState;
 use crate::logical_plan::TableScanPlan;
 use crate::logical_plan::{
-    unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
-    Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan,
+    unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
+    Partitioning as LogicalPartitioning, PlanType, Repartition, 
ToStringifiedPlan, Union,
     UserDefinedLogicalNode,
 };
 use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
@@ -643,17 +643,17 @@ impl DefaultPhysicalPlanner {
                     )?;
                     Ok(Arc::new(FilterExec::try_new(runtime_expr, 
physical_input)?) )
                 }
-                LogicalPlan::Union { inputs, .. } => {
+                LogicalPlan::Union(Union { inputs, .. }) => {
                     let physical_plans = futures::stream::iter(inputs)
                         .then(|lp| self.create_initial_plan(lp, ctx_state))
                         .try_collect::<Vec<_>>()
                         .await?;
                     Ok(Arc::new(UnionExec::new(physical_plans)) )
                 }
-                LogicalPlan::Repartition {
+                LogicalPlan::Repartition(Repartition {
                     input,
                     partitioning_scheme,
-                } => {
+                }) => {
                     let physical_input = self.create_initial_plan(input, 
ctx_state).await?;
                     let input_schema = physical_input.schema();
                     let input_dfschema = input.as_ref().schema();
@@ -776,7 +776,7 @@ impl DefaultPhysicalPlanner {
                         )?))
                     }
                 }
-                LogicalPlan::CrossJoin { left, right, .. } => {
+                LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
                     let left = self.create_initial_plan(left, 
ctx_state).await?;
                     let right = self.create_initial_plan(right, 
ctx_state).await?;
                     Ok(Arc::new(CrossJoinExec::try_new(left, right)?))

Reply via email to