This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f5d19  Extract Aggregate, Sort, and Join to struct from 
AggregatePlan (#1326)
66f5d19 is described below

commit 66f5d1906992dcf4f5b7ab787a39a4105c2f5b42
Author: Matthew Turner <[email protected]>
AuthorDate: Tue Nov 23 03:41:13 2021 -0800

    Extract Aggregate, Sort, and Join to struct from AggregatePlan (#1326)
    
    * First updates for AggregatePlan
    
    * Finished updating AggregatePlan
    
    * Rebase and update style
    
    * Rebase and update style
    
    * Cleanup errors on aggregate
    
    * Fix tests
    
    * Update sort
    
    * Update join
    
    * Update tests
    
    * Update filter push down
    
    * Style updates
---
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  16 +--
 datafusion/src/logical_plan/builder.rs             |  20 ++--
 datafusion/src/logical_plan/plan.rs                | 126 +++++++++++----------
 .../src/optimizer/common_subexpr_eliminate.rs      |  22 ++--
 datafusion/src/optimizer/filter_push_down.rs       |  12 +-
 datafusion/src/optimizer/projection_push_down.rs   |  25 ++--
 .../src/optimizer/single_distinct_to_groupby.rs    |  18 +--
 datafusion/src/optimizer/utils.rs                  |  22 ++--
 datafusion/src/physical_plan/planner.rs            |  16 +--
 datafusion/tests/sql.rs                            |   4 +-
 datafusion/tests/user_defined_plan.rs              |   6 +-
 11 files changed, 154 insertions(+), 133 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 0e0b989..897558c 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -30,7 +30,9 @@ use datafusion::datasource::TableProvider;
 
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::ListingTable;
-use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection, 
Window};
+use datafusion::logical_plan::plan::{
+    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
+};
 use datafusion::logical_plan::{
     exprlist_to_fields,
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
@@ -823,12 +825,12 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     ))),
                 })
             }
-            LogicalPlan::Aggregate {
-                input,
+            LogicalPlan::Aggregate(Aggregate {
                 group_expr,
                 aggr_expr,
+                input,
                 ..
-            } => {
+            }) => {
                 let input: protobuf::LogicalPlanNode = 
input.as_ref().try_into()?;
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: 
Some(LogicalPlanType::Aggregate(Box::new(
@@ -846,7 +848,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     ))),
                 })
             }
-            LogicalPlan::Join {
+            LogicalPlan::Join(Join {
                 left,
                 right,
                 on,
@@ -854,7 +856,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                 join_constraint,
                 null_equals_null,
                 ..
-            } => {
+            }) => {
                 let left: protobuf::LogicalPlanNode = 
left.as_ref().try_into()?;
                 let right: protobuf::LogicalPlanNode = 
right.as_ref().try_into()?;
                 let (left_join_column, right_join_column) =
@@ -887,7 +889,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     ))),
                 })
             }
-            LogicalPlan::Sort { input, expr } => {
+            LogicalPlan::Sort(Sort { input, expr }) => {
                 let input: protobuf::LogicalPlanNode = 
input.as_ref().try_into()?;
                 let selection_expr: Vec<protobuf::LogicalExprNode> = expr
                     .iter()
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index 2eb0091..e7f3441 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -26,8 +26,8 @@ use crate::datasource::{
 };
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::plan::{
-    AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan,
-    ToStringifiedPlan, Union, Window,
+    Aggregate, AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Join, 
Projection, Sort,
+    TableScanPlan, ToStringifiedPlan, Union, Window,
 };
 use crate::prelude::*;
 use crate::scalar::ScalarValue;
@@ -468,10 +468,10 @@ impl LogicalPlanBuilder {
 
     /// Apply a sort
     pub fn sort(&self, exprs: impl IntoIterator<Item = impl Into<Expr>>) -> 
Result<Self> {
-        Ok(Self::from(LogicalPlan::Sort {
+        Ok(Self::from(LogicalPlan::Sort(Sort {
             expr: normalize_cols(exprs, &self.plan)?,
             input: Arc::new(self.plan.clone()),
-        }))
+        })))
     }
 
     /// Apply a union
@@ -587,7 +587,7 @@ impl LogicalPlanBuilder {
         let join_schema =
             build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
 
-        Ok(Self::from(LogicalPlan::Join {
+        Ok(Self::from(LogicalPlan::Join(Join {
             left: Arc::new(self.plan.clone()),
             right: Arc::new(right.clone()),
             on,
@@ -595,7 +595,7 @@ impl LogicalPlanBuilder {
             join_constraint: JoinConstraint::On,
             schema: DFSchemaRef::new(join_schema),
             null_equals_null,
-        }))
+        })))
     }
 
     /// Apply a join with using constraint, which duplicates all join columns 
in output schema.
@@ -619,7 +619,7 @@ impl LogicalPlanBuilder {
         let join_schema =
             build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
 
-        Ok(Self::from(LogicalPlan::Join {
+        Ok(Self::from(LogicalPlan::Join(Join {
             left: Arc::new(self.plan.clone()),
             right: Arc::new(right.clone()),
             on,
@@ -627,7 +627,7 @@ impl LogicalPlanBuilder {
             join_constraint: JoinConstraint::Using,
             schema: DFSchemaRef::new(join_schema),
             null_equals_null: false,
-        }))
+        })))
     }
 
     /// Apply a cross join
@@ -680,12 +680,12 @@ impl LogicalPlanBuilder {
         validate_unique_names("Aggregations", all_expr.clone(), 
self.plan.schema())?;
         let aggr_schema =
             DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
-        Ok(Self::from(LogicalPlan::Aggregate {
+        Ok(Self::from(LogicalPlan::Aggregate(Aggregate {
             input: Arc::new(self.plan.clone()),
             group_expr,
             aggr_expr,
             schema: DFSchemaRef::new(aggr_schema),
-        }))
+        })))
     }
 
     /// Create an expression to represent the explanation of the plan
diff --git a/datafusion/src/logical_plan/plan.rs 
b/datafusion/src/logical_plan/plan.rs
index a7aac90..88bc7a2 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -243,6 +243,47 @@ pub struct Values {
     pub values: Vec<Vec<Expr>>,
 }
 
+/// Aggregates its input based on a set of grouping and aggregate
+/// expressions (e.g. SUM).
+#[derive(Clone)]
+pub struct Aggregate {
+    /// The incoming logical plan
+    pub input: Arc<LogicalPlan>,
+    /// Grouping expressions
+    pub group_expr: Vec<Expr>,
+    /// Aggregate expressions
+    pub aggr_expr: Vec<Expr>,
+    /// The schema description of the aggregate output
+    pub schema: DFSchemaRef,
+}
+
+/// Sorts its input according to a list of sort expressions.
+#[derive(Clone)]
+pub struct Sort {
+    /// The sort expressions
+    pub expr: Vec<Expr>,
+    /// The incoming logical plan
+    pub input: Arc<LogicalPlan>,
+}
+
+/// Join two logical plans on one or more join columns
+#[derive(Clone)]
+pub struct Join {
+    /// Left input
+    pub left: Arc<LogicalPlan>,
+    /// Right input
+    pub right: Arc<LogicalPlan>,
+    /// Equijoin clause expressed as pairs of (left, right) join columns
+    pub on: Vec<(Column, Column)>,
+    /// Join type
+    pub join_type: JoinType,
+    /// Join constraint
+    pub join_constraint: JoinConstraint,
+    /// The output schema, containing fields from the left and right inputs
+    pub schema: DFSchemaRef,
+    /// If null_equals_null is true, null == null else null != null
+    pub null_equals_null: bool,
+}
 /// A LogicalPlan represents the different types of relational
 /// operators (such as Projection, Filter, etc) and can be created by
 /// the SQL query planner and the DataFrame API.
@@ -269,40 +310,11 @@ pub enum LogicalPlan {
     Window(Window),
     /// Aggregates its input based on a set of grouping and aggregate
     /// expressions (e.g. SUM).
-    Aggregate {
-        /// The incoming logical plan
-        input: Arc<LogicalPlan>,
-        /// Grouping expressions
-        group_expr: Vec<Expr>,
-        /// Aggregate expressions
-        aggr_expr: Vec<Expr>,
-        /// The schema description of the aggregate output
-        schema: DFSchemaRef,
-    },
+    Aggregate(Aggregate),
     /// Sorts its input according to a list of sort expressions.
-    Sort {
-        /// The sort expressions
-        expr: Vec<Expr>,
-        /// The incoming logical plan
-        input: Arc<LogicalPlan>,
-    },
+    Sort(Sort),
     /// Join two logical plans on one or more join columns
-    Join {
-        /// Left input
-        left: Arc<LogicalPlan>,
-        /// Right input
-        right: Arc<LogicalPlan>,
-        /// Equijoin clause expressed as pairs of (left, right) join columns
-        on: Vec<(Column, Column)>,
-        /// Join type
-        join_type: JoinType,
-        /// Join constraint
-        join_constraint: JoinConstraint,
-        /// The output schema, containing fields from the left and right inputs
-        schema: DFSchemaRef,
-        /// If null_equals_null is true, null == null else null != null
-        null_equals_null: bool,
-    },
+    Join(Join),
     /// Apply Cross Join to two logical plans
     CrossJoin(CrossJoin),
     /// Repartition the plan based on a partitioning scheme.
@@ -347,9 +359,9 @@ impl LogicalPlan {
             LogicalPlan::Projection(Projection { schema, .. }) => schema,
             LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
             LogicalPlan::Window(Window { schema, .. }) => schema,
-            LogicalPlan::Aggregate { schema, .. } => schema,
-            LogicalPlan::Sort { input, .. } => input.schema(),
-            LogicalPlan::Join { schema, .. } => schema,
+            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
+            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
+            LogicalPlan::Join(Join { schema, .. }) => schema,
             LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
             LogicalPlan::Repartition(Repartition { input, .. }) => 
input.schema(),
             LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
@@ -375,18 +387,18 @@ impl LogicalPlan {
             }) => vec![projected_schema],
             LogicalPlan::Values(Values { schema, .. }) => vec![schema],
             LogicalPlan::Window(Window { input, schema, .. })
-            | LogicalPlan::Aggregate { input, schema, .. }
-            | LogicalPlan::Projection(Projection { input, schema, .. }) => {
+            | LogicalPlan::Projection(Projection { input, schema, .. })
+            | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => {
                 let mut schemas = input.all_schemas();
                 schemas.insert(0, schema);
                 schemas
             }
-            LogicalPlan::Join {
+            LogicalPlan::Join(Join {
                 left,
                 right,
                 schema,
                 ..
-            }
+            })
             | LogicalPlan::CrossJoin(CrossJoin {
                 left,
                 right,
@@ -409,7 +421,7 @@ impl LogicalPlan {
             }
             LogicalPlan::Limit(Limit { input, .. })
             | LogicalPlan::Repartition(Repartition { input, .. })
-            | LogicalPlan::Sort { input, .. }
+            | LogicalPlan::Sort(Sort { input, .. })
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
             | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
             LogicalPlan::DropTable(_) => vec![],
@@ -442,16 +454,16 @@ impl LogicalPlan {
                 _ => vec![],
             },
             LogicalPlan::Window(Window { window_expr, .. }) => 
window_expr.clone(),
-            LogicalPlan::Aggregate {
+            LogicalPlan::Aggregate(Aggregate {
                 group_expr,
                 aggr_expr,
                 ..
-            } => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
-            LogicalPlan::Join { on, .. } => on
+            }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
+            LogicalPlan::Join(Join { on, .. }) => on
                 .iter()
                 .flat_map(|(l, r)| vec![Expr::Column(l.clone()), 
Expr::Column(r.clone())])
                 .collect(),
-            LogicalPlan::Sort { expr, .. } => expr.clone(),
+            LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
             LogicalPlan::Extension(extension) => extension.node.expressions(),
             // plans without expressions
             LogicalPlan::TableScan { .. }
@@ -477,9 +489,9 @@ impl LogicalPlan {
             LogicalPlan::Filter(Filter { input, .. }) => vec![input],
             LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
             LogicalPlan::Window(Window { input, .. }) => vec![input],
-            LogicalPlan::Aggregate { input, .. } => vec![input],
-            LogicalPlan::Sort { input, .. } => vec![input],
-            LogicalPlan::Join { left, right, .. } => vec![left, right],
+            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
+            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
+            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
             LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => 
vec![left, right],
             LogicalPlan::Limit(Limit { input, .. }) => vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
@@ -508,11 +520,11 @@ impl LogicalPlan {
             type Error = DataFusionError;
 
             fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, 
Self::Error> {
-                if let LogicalPlan::Join {
+                if let LogicalPlan::Join(Join {
                     join_constraint: JoinConstraint::Using,
                     on,
                     ..
-                } = plan
+                }) = plan
                 {
                     self.using_columns.push(
                         on.iter()
@@ -614,9 +626,9 @@ impl LogicalPlan {
                 input.accept(visitor)?
             }
             LogicalPlan::Window(Window { input, .. }) => 
input.accept(visitor)?,
-            LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
-            LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
-            LogicalPlan::Join { left, right, .. }
+            LogicalPlan::Aggregate(Aggregate { input, .. }) => 
input.accept(visitor)?,
+            LogicalPlan::Sort(Sort { input, .. }) => input.accept(visitor)?,
+            LogicalPlan::Join(Join { left, right, .. })
             | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
                 left.accept(visitor)? && right.accept(visitor)?
             }
@@ -900,16 +912,16 @@ impl LogicalPlan {
                     }) => {
                         write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
                     }
-                    LogicalPlan::Aggregate {
+                    LogicalPlan::Aggregate(Aggregate {
                         ref group_expr,
                         ref aggr_expr,
                         ..
-                    } => write!(
+                    }) => write!(
                         f,
                         "Aggregate: groupBy=[{:?}], aggr=[{:?}]",
                         group_expr, aggr_expr
                     ),
-                    LogicalPlan::Sort { ref expr, .. } => {
+                    LogicalPlan::Sort(Sort { expr, .. }) => {
                         write!(f, "Sort: ")?;
                         for (i, expr_item) in expr.iter().enumerate() {
                             if i > 0 {
@@ -919,11 +931,11 @@ impl LogicalPlan {
                         }
                         Ok(())
                     }
-                    LogicalPlan::Join {
+                    LogicalPlan::Join(Join {
                         on: ref keys,
                         join_constraint,
                         ..
-                    } => {
+                    }) => {
                         let join_expr: Vec<String> =
                             keys.iter().map(|(l, r)| format!("{} = {}", l, 
r)).collect();
                         match join_constraint {
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs 
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index ced2c0c..233d112 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -21,8 +21,10 @@ use crate::error::Result;
 use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{Filter, Projection, Window};
 use crate::logical_plan::{
-    col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan,
-    Recursion, RewriteRecursion,
+    col,
+    plan::{Aggregate, Sort},
+    DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan, 
Recursion,
+    RewriteRecursion,
 };
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
@@ -150,12 +152,12 @@ fn optimize(plan: &LogicalPlan, execution_props: 
&ExecutionProps) -> Result<Logi
                 schema: schema.clone(),
             }))
         }
-        LogicalPlan::Aggregate {
-            input,
+        LogicalPlan::Aggregate(Aggregate {
             group_expr,
             aggr_expr,
+            input,
             schema,
-        } => {
+        }) => {
             let group_arrays = to_arrays(group_expr, input, &mut expr_set)?;
             let aggr_arrays = to_arrays(aggr_expr, input, &mut expr_set)?;
 
@@ -171,14 +173,14 @@ fn optimize(plan: &LogicalPlan, execution_props: 
&ExecutionProps) -> Result<Logi
             let new_aggr_expr = new_expr.pop().unwrap();
             let new_group_expr = new_expr.pop().unwrap();
 
-            Ok(LogicalPlan::Aggregate {
+            Ok(LogicalPlan::Aggregate(Aggregate {
                 input: Arc::new(new_input),
                 group_expr: new_group_expr,
                 aggr_expr: new_aggr_expr,
                 schema: schema.clone(),
-            })
+            }))
         }
-        LogicalPlan::Sort { expr, input } => {
+        LogicalPlan::Sort(Sort { expr, input }) => {
             let arrays = to_arrays(expr, input, &mut expr_set)?;
 
             let (mut new_expr, new_input) = rewrite_expr(
@@ -190,10 +192,10 @@ fn optimize(plan: &LogicalPlan, execution_props: 
&ExecutionProps) -> Result<Logi
                 execution_props,
             )?;
 
-            Ok(LogicalPlan::Sort {
+            Ok(LogicalPlan::Sort(Sort {
                 expr: new_expr.pop().unwrap(),
                 input: Arc::new(new_input),
-            })
+            }))
         }
         LogicalPlan::Join { .. }
         | LogicalPlan::CrossJoin(_)
diff --git a/datafusion/src/optimizer/filter_push_down.rs 
b/datafusion/src/optimizer/filter_push_down.rs
index 4214a57..2c70297 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,7 +16,7 @@
 
 use crate::datasource::datasource::TableProviderFilterPushDown;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{Filter, Projection};
+use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection};
 use crate::logical_plan::{
     and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan,
 };
@@ -354,9 +354,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
 
             utils::from_plan(plan, expr, &[new_input])
         }
-        LogicalPlan::Aggregate {
-            input, aggr_expr, ..
-        } => {
+        LogicalPlan::Aggregate(Aggregate {
+            aggr_expr, input, ..
+        }) => {
             // An aggregate's aggreagate columns are _not_ filter-commutable 
=> collect these:
             // * columns whose aggregation expression depends on
             // * the aggregation columns themselves
@@ -394,9 +394,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
         LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
             optimize_join(state, plan, left, right)
         }
-        LogicalPlan::Join {
+        LogicalPlan::Join(Join {
             left, right, on, ..
-        } => {
+        }) => {
             // duplicate filters for joined columns so filters can be pushed 
down to both sides.
             // Take the following query as an example:
             //
diff --git a/datafusion/src/optimizer/projection_push_down.rs 
b/datafusion/src/optimizer/projection_push_down.rs
index 5b1f3ea..1b331ba 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -20,7 +20,9 @@
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{AnalyzePlan, Projection, TableScanPlan, 
Window};
+use crate::logical_plan::plan::{
+    Aggregate, AnalyzePlan, Join, Projection, TableScanPlan, Window,
+};
 use crate::logical_plan::{
     build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
     LogicalPlanBuilder, ToDFSchema, Union,
@@ -190,7 +192,7 @@ fn optimize_plan(
                 }))
             }
         }
-        LogicalPlan::Join {
+        LogicalPlan::Join(Join {
             left,
             right,
             on,
@@ -198,7 +200,7 @@ fn optimize_plan(
             join_constraint,
             null_equals_null,
             ..
-        } => {
+        }) => {
             for (l, r) in on {
                 new_required_columns.insert(l.clone());
                 new_required_columns.insert(r.clone());
@@ -226,7 +228,7 @@ fn optimize_plan(
                 join_type,
             )?;
 
-            Ok(LogicalPlan::Join {
+            Ok(LogicalPlan::Join(Join {
                 left: optimized_left,
                 right: optimized_right,
                 join_type: *join_type,
@@ -234,7 +236,7 @@ fn optimize_plan(
                 on: on.clone(),
                 schema: DFSchemaRef::new(schema),
                 null_equals_null: *null_equals_null,
-            })
+            }))
         }
         LogicalPlan::Window(Window {
             schema,
@@ -275,13 +277,12 @@ fn optimize_plan(
             .window(new_window_expr)?
             .build()
         }
-        LogicalPlan::Aggregate {
-            schema,
-            input,
+        LogicalPlan::Aggregate(Aggregate {
             group_expr,
             aggr_expr,
-            ..
-        } => {
+            schema,
+            input,
+        }) => {
             // aggregate:
             // * remove any aggregate expression that is not required
             // * construct the new set of required columns
@@ -314,7 +315,7 @@ fn optimize_plan(
                     .collect(),
             )?;
 
-            Ok(LogicalPlan::Aggregate {
+            Ok(LogicalPlan::Aggregate(Aggregate {
                 group_expr: group_expr.clone(),
                 aggr_expr: new_aggr_expr,
                 input: Arc::new(optimize_plan(
@@ -325,7 +326,7 @@ fn optimize_plan(
                     execution_props,
                 )?),
                 schema: DFSchemaRef::new(new_schema),
-            })
+            }))
         }
         // scans:
         // * remove un-used columns from the scan projection
diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs 
b/datafusion/src/optimizer/single_distinct_to_groupby.rs
index 358444d..3232fa0 100644
--- a/datafusion/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs
@@ -19,7 +19,7 @@
 
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::Projection;
+use crate::logical_plan::plan::{Aggregate, Projection};
 use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
@@ -51,12 +51,12 @@ impl SingleDistinctToGroupBy {
 
 fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
     match plan {
-        LogicalPlan::Aggregate {
+        LogicalPlan::Aggregate(Aggregate {
             input,
             aggr_expr,
             schema,
             group_expr,
-        } => {
+        }) => {
             if is_single_distinct_agg(plan) {
                 let mut group_fields_set = HashSet::new();
                 let mut all_group_args = group_expr.clone();
@@ -87,12 +87,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
                     .collect::<Vec<_>>();
 
                 let grouped_schema = DFSchema::new(all_field).unwrap();
-                let grouped_agg = LogicalPlan::Aggregate {
+                let grouped_agg = LogicalPlan::Aggregate(Aggregate {
                     input: input.clone(),
                     group_expr: all_group_args,
                     aggr_expr: Vec::new(),
                     schema: Arc::new(grouped_schema.clone()),
-                };
+                });
                 let grouped_agg = optimize_children(&grouped_agg);
                 let final_agg_schema = Arc::new(
                     DFSchema::new(
@@ -105,12 +105,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
                     .unwrap(),
                 );
 
-                let final_agg = LogicalPlan::Aggregate {
+                let final_agg = LogicalPlan::Aggregate(Aggregate {
                     input: Arc::new(grouped_agg.unwrap()),
                     group_expr: group_expr.clone(),
                     aggr_expr: new_aggr_expr,
                     schema: final_agg_schema.clone(),
-                };
+                });
 
                 //so the aggregates are displayed in the same way even after 
the rewrite
                 let mut alias_expr: Vec<Expr> = Vec::new();
@@ -151,9 +151,9 @@ fn optimize_children(plan: &LogicalPlan) -> 
Result<LogicalPlan> {
 
 fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
     match plan {
-        LogicalPlan::Aggregate {
+        LogicalPlan::Aggregate(Aggregate {
             input, aggr_expr, ..
-        } => {
+        }) => {
             let mut fields_set = HashSet::new();
             aggr_expr
                 .iter()
diff --git a/datafusion/src/optimizer/utils.rs 
b/datafusion/src/optimizer/utils.rs
index aa559cd..60a81f0 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -23,7 +23,9 @@ use arrow::record_batch::RecordBatch;
 
 use super::optimizer::OptimizerRule;
 use crate::execution::context::{ExecutionContextState, ExecutionProps};
-use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan, Filter, 
Projection, Window};
+use crate::logical_plan::plan::{
+    Aggregate, AnalyzePlan, ExtensionPlan, Filter, Join, Projection, Sort, 
Window,
+};
 use crate::logical_plan::{
     build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
     ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator, 
Partitioning,
@@ -188,28 +190,28 @@ pub fn from_plan(
             window_expr: expr[0..window_expr.len()].to_vec(),
             schema: schema.clone(),
         })),
-        LogicalPlan::Aggregate {
+        LogicalPlan::Aggregate(Aggregate {
             group_expr, schema, ..
-        } => Ok(LogicalPlan::Aggregate {
+        }) => Ok(LogicalPlan::Aggregate(Aggregate {
             group_expr: expr[0..group_expr.len()].to_vec(),
             aggr_expr: expr[group_expr.len()..].to_vec(),
             input: Arc::new(inputs[0].clone()),
             schema: schema.clone(),
-        }),
-        LogicalPlan::Sort { .. } => Ok(LogicalPlan::Sort {
+        })),
+        LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort {
             expr: expr.to_vec(),
             input: Arc::new(inputs[0].clone()),
-        }),
-        LogicalPlan::Join {
+        })),
+        LogicalPlan::Join(Join {
             join_type,
             join_constraint,
             on,
             null_equals_null,
             ..
-        } => {
+        }) => {
             let schema =
                 build_join_schema(inputs[0].schema(), inputs[1].schema(), 
join_type)?;
-            Ok(LogicalPlan::Join {
+            Ok(LogicalPlan::Join(Join {
                 left: Arc::new(inputs[0].clone()),
                 right: Arc::new(inputs[1].clone()),
                 join_type: *join_type,
@@ -217,7 +219,7 @@ pub fn from_plan(
                 on: on.clone(),
                 schema: DFSchemaRef::new(schema),
                 null_equals_null: *null_equals_null,
-            })
+            }))
         }
         LogicalPlan::CrossJoin(_) => {
             let left = inputs[0].clone();
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index a7ba64a..f2f526d 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,13 +23,15 @@ use super::{
     hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, 
windows,
 };
 use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::plan::{EmptyRelation, Filter, Projection, Window};
+use crate::logical_plan::plan::{
+    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScanPlan, 
Window,
+};
 use crate::logical_plan::{
     unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, 
Operator,
     Partitioning as LogicalPartitioning, PlanType, Repartition, 
ToStringifiedPlan, Union,
     UserDefinedLogicalNode,
 };
-use crate::logical_plan::{Limit, TableScanPlan, Values};
+use crate::logical_plan::{Limit, Values};
 use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::cross_join::CrossJoinExec;
 use crate::physical_plan::explain::ExplainExec;
@@ -479,12 +481,12 @@ impl DefaultPhysicalPlanner {
                         physical_input_schema,
                     )?) )
                 }
-                LogicalPlan::Aggregate {
+                LogicalPlan::Aggregate(Aggregate {
                     input,
                     group_expr,
                     aggr_expr,
                     ..
-                } => {
+                }) => {
                     // Initially need to perform the aggregate and then merge 
the partitions
                     let input_exec = self.create_initial_plan(input, 
ctx_state).await?;
                     let physical_input_schema = input_exec.schema();
@@ -676,7 +678,7 @@ impl DefaultPhysicalPlanner {
                         physical_partitioning,
                     )?) )
                 }
-                LogicalPlan::Sort { expr, input, .. } => {
+                LogicalPlan::Sort(Sort { expr, input, .. }) => {
                     let physical_input = self.create_initial_plan(input, 
ctx_state).await?;
                     let input_schema = physical_input.as_ref().schema();
                     let input_dfschema = input.as_ref().schema();
@@ -704,14 +706,14 @@ impl DefaultPhysicalPlanner {
                         .collect::<Result<Vec<_>>>()?;
                     Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) 
)
                 }
-                LogicalPlan::Join {
+                LogicalPlan::Join(Join {
                     left,
                     right,
                     on: keys,
                     join_type,
                     null_equals_null,
                     ..
-                } => {
+                }) => {
                     let left_df_schema = left.schema();
                     let physical_left = self.create_initial_plan(left, 
ctx_state).await?;
                     let right_df_schema = right.schema();
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 5ea008c..809018b 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,7 +36,7 @@ use datafusion::assert_batches_eq;
 use datafusion::assert_batches_sorted_eq;
 use datafusion::assert_contains;
 use datafusion::assert_not_contains;
-use datafusion::logical_plan::plan::Projection;
+use datafusion::logical_plan::plan::{Aggregate, Projection};
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::logical_plan::TableScanPlan;
 use datafusion::physical_plan::functions::Volatility;
@@ -92,7 +92,7 @@ async fn nyc() -> Result<()> {
 
     match &optimized_plan {
         LogicalPlan::Projection(Projection { input, .. }) => match 
input.as_ref() {
-            LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
+            LogicalPlan::Aggregate(Aggregate { input, .. }) => match 
input.as_ref() {
                 LogicalPlan::TableScan(TableScanPlan {
                     ref projected_schema,
                     ..
diff --git a/datafusion/tests/user_defined_plan.rs 
b/datafusion/tests/user_defined_plan.rs
index a63ef2a..d931985 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -86,7 +86,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
 
 use async_trait::async_trait;
 use datafusion::execution::context::ExecutionProps;
-use datafusion::logical_plan::plan::ExtensionPlan;
+use datafusion::logical_plan::plan::{ExtensionPlan, Sort};
 use datafusion::logical_plan::{DFSchemaRef, Limit};
 
 /// Execute the specified sql and return the resulting record batches
@@ -289,10 +289,10 @@ impl OptimizerRule for TopKOptimizerRule {
         // Sort and replaces it by a TopK node. It does not handle many
         // edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
         if let LogicalPlan::Limit(Limit { ref n, ref input }) = plan {
-            if let LogicalPlan::Sort {
+            if let LogicalPlan::Sort(Sort {
                 ref expr,
                 ref input,
-            } = **input
+            }) = **input
             {
                 if expr.len() == 1 {
                     // we found a sort with a single sort expr, replace with a 
a TopK

Reply via email to