This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 66f5d19 Extract Aggregate, Sort, and Join to struct from
AggregatePlan (#1326)
66f5d19 is described below
commit 66f5d1906992dcf4f5b7ab787a39a4105c2f5b42
Author: Matthew Turner <[email protected]>
AuthorDate: Tue Nov 23 03:41:13 2021 -0800
Extract Aggregate, Sort, and Join to struct from AggregatePlan (#1326)
* First updates for AggregatePlan
* Finished updating AggregatePlan
* Rebase and update style
* Rebase and update style
* Cleanup errors on aggregate
* Fix tests
* Update sort
* Update join
* Update tests
* Update filter push down
* Style updates
---
.../rust/core/src/serde/logical_plan/to_proto.rs | 16 +--
datafusion/src/logical_plan/builder.rs | 20 ++--
datafusion/src/logical_plan/plan.rs | 126 +++++++++++----------
.../src/optimizer/common_subexpr_eliminate.rs | 22 ++--
datafusion/src/optimizer/filter_push_down.rs | 12 +-
datafusion/src/optimizer/projection_push_down.rs | 25 ++--
.../src/optimizer/single_distinct_to_groupby.rs | 18 +--
datafusion/src/optimizer/utils.rs | 22 ++--
datafusion/src/physical_plan/planner.rs | 16 +--
datafusion/tests/sql.rs | 4 +-
datafusion/tests/user_defined_plan.rs | 6 +-
11 files changed, 154 insertions(+), 133 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 0e0b989..897558c 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -30,7 +30,9 @@ use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingTable;
-use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection,
Window};
+use datafusion::logical_plan::plan::{
+ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
+};
use datafusion::logical_plan::{
exprlist_to_fields,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
@@ -823,12 +825,12 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
- LogicalPlan::Aggregate {
- input,
+ LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
+ input,
..
- } => {
+ }) => {
let input: protobuf::LogicalPlanNode =
input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type:
Some(LogicalPlanType::Aggregate(Box::new(
@@ -846,7 +848,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
- LogicalPlan::Join {
+ LogicalPlan::Join(Join {
left,
right,
on,
@@ -854,7 +856,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
join_constraint,
null_equals_null,
..
- } => {
+ }) => {
let left: protobuf::LogicalPlanNode =
left.as_ref().try_into()?;
let right: protobuf::LogicalPlanNode =
right.as_ref().try_into()?;
let (left_join_column, right_join_column) =
@@ -887,7 +889,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
- LogicalPlan::Sort { input, expr } => {
+ LogicalPlan::Sort(Sort { input, expr }) => {
let input: protobuf::LogicalPlanNode =
input.as_ref().try_into()?;
let selection_expr: Vec<protobuf::LogicalExprNode> = expr
.iter()
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index 2eb0091..e7f3441 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -26,8 +26,8 @@ use crate::datasource::{
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{
- AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan,
- ToStringifiedPlan, Union, Window,
+ Aggregate, AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Join,
Projection, Sort,
+ TableScanPlan, ToStringifiedPlan, Union, Window,
};
use crate::prelude::*;
use crate::scalar::ScalarValue;
@@ -468,10 +468,10 @@ impl LogicalPlanBuilder {
/// Apply a sort
pub fn sort(&self, exprs: impl IntoIterator<Item = impl Into<Expr>>) ->
Result<Self> {
- Ok(Self::from(LogicalPlan::Sort {
+ Ok(Self::from(LogicalPlan::Sort(Sort {
expr: normalize_cols(exprs, &self.plan)?,
input: Arc::new(self.plan.clone()),
- }))
+ })))
}
/// Apply a union
@@ -587,7 +587,7 @@ impl LogicalPlanBuilder {
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
- Ok(Self::from(LogicalPlan::Join {
+ Ok(Self::from(LogicalPlan::Join(Join {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
on,
@@ -595,7 +595,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equals_null,
- }))
+ })))
}
/// Apply a join with using constraint, which duplicates all join columns
in output schema.
@@ -619,7 +619,7 @@ impl LogicalPlanBuilder {
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
- Ok(Self::from(LogicalPlan::Join {
+ Ok(Self::from(LogicalPlan::Join(Join {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
on,
@@ -627,7 +627,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::Using,
schema: DFSchemaRef::new(join_schema),
null_equals_null: false,
- }))
+ })))
}
/// Apply a cross join
@@ -680,12 +680,12 @@ impl LogicalPlanBuilder {
validate_unique_names("Aggregations", all_expr.clone(),
self.plan.schema())?;
let aggr_schema =
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
- Ok(Self::from(LogicalPlan::Aggregate {
+ Ok(Self::from(LogicalPlan::Aggregate(Aggregate {
input: Arc::new(self.plan.clone()),
group_expr,
aggr_expr,
schema: DFSchemaRef::new(aggr_schema),
- }))
+ })))
}
/// Create an expression to represent the explanation of the plan
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index a7aac90..88bc7a2 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -243,6 +243,47 @@ pub struct Values {
pub values: Vec<Vec<Expr>>,
}
+/// Aggregates its input based on a set of grouping and aggregate
+/// expressions (e.g. SUM).
+#[derive(Clone)]
+pub struct Aggregate {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// Grouping expressions
+ pub group_expr: Vec<Expr>,
+ /// Aggregate expressions
+ pub aggr_expr: Vec<Expr>,
+ /// The schema description of the aggregate output
+ pub schema: DFSchemaRef,
+}
+
+/// Sorts its input according to a list of sort expressions.
+#[derive(Clone)]
+pub struct Sort {
+ /// The sort expressions
+ pub expr: Vec<Expr>,
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+}
+
+/// Join two logical plans on one or more join columns
+#[derive(Clone)]
+pub struct Join {
+ /// Left input
+ pub left: Arc<LogicalPlan>,
+ /// Right input
+ pub right: Arc<LogicalPlan>,
+ /// Equijoin clause expressed as pairs of (left, right) join columns
+ pub on: Vec<(Column, Column)>,
+ /// Join type
+ pub join_type: JoinType,
+ /// Join constraint
+ pub join_constraint: JoinConstraint,
+ /// The output schema, containing fields from the left and right inputs
+ pub schema: DFSchemaRef,
+ /// If null_equals_null is true, null == null else null != null
+ pub null_equals_null: bool,
+}
/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
/// the SQL query planner and the DataFrame API.
@@ -269,40 +310,11 @@ pub enum LogicalPlan {
Window(Window),
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
- Aggregate {
- /// The incoming logical plan
- input: Arc<LogicalPlan>,
- /// Grouping expressions
- group_expr: Vec<Expr>,
- /// Aggregate expressions
- aggr_expr: Vec<Expr>,
- /// The schema description of the aggregate output
- schema: DFSchemaRef,
- },
+ Aggregate(Aggregate),
/// Sorts its input according to a list of sort expressions.
- Sort {
- /// The sort expressions
- expr: Vec<Expr>,
- /// The incoming logical plan
- input: Arc<LogicalPlan>,
- },
+ Sort(Sort),
/// Join two logical plans on one or more join columns
- Join {
- /// Left input
- left: Arc<LogicalPlan>,
- /// Right input
- right: Arc<LogicalPlan>,
- /// Equijoin clause expressed as pairs of (left, right) join columns
- on: Vec<(Column, Column)>,
- /// Join type
- join_type: JoinType,
- /// Join constraint
- join_constraint: JoinConstraint,
- /// The output schema, containing fields from the left and right inputs
- schema: DFSchemaRef,
- /// If null_equals_null is true, null == null else null != null
- null_equals_null: bool,
- },
+ Join(Join),
/// Apply Cross Join to two logical plans
CrossJoin(CrossJoin),
/// Repartition the plan based on a partitioning scheme.
@@ -347,9 +359,9 @@ impl LogicalPlan {
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Window(Window { schema, .. }) => schema,
- LogicalPlan::Aggregate { schema, .. } => schema,
- LogicalPlan::Sort { input, .. } => input.schema(),
- LogicalPlan::Join { schema, .. } => schema,
+ LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
+ LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
+ LogicalPlan::Join(Join { schema, .. }) => schema,
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) =>
input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
@@ -375,18 +387,18 @@ impl LogicalPlan {
}) => vec![projected_schema],
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
LogicalPlan::Window(Window { input, schema, .. })
- | LogicalPlan::Aggregate { input, schema, .. }
- | LogicalPlan::Projection(Projection { input, schema, .. }) => {
+ | LogicalPlan::Projection(Projection { input, schema, .. })
+ | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => {
let mut schemas = input.all_schemas();
schemas.insert(0, schema);
schemas
}
- LogicalPlan::Join {
+ LogicalPlan::Join(Join {
left,
right,
schema,
..
- }
+ })
| LogicalPlan::CrossJoin(CrossJoin {
left,
right,
@@ -409,7 +421,7 @@ impl LogicalPlan {
}
LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
- | LogicalPlan::Sort { input, .. }
+ | LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
@@ -442,16 +454,16 @@ impl LogicalPlan {
_ => vec![],
},
LogicalPlan::Window(Window { window_expr, .. }) =>
window_expr.clone(),
- LogicalPlan::Aggregate {
+ LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
- } => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
- LogicalPlan::Join { on, .. } => on
+ }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
+ LogicalPlan::Join(Join { on, .. }) => on
.iter()
.flat_map(|(l, r)| vec![Expr::Column(l.clone()),
Expr::Column(r.clone())])
.collect(),
- LogicalPlan::Sort { expr, .. } => expr.clone(),
+ LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
LogicalPlan::Extension(extension) => extension.node.expressions(),
// plans without expressions
LogicalPlan::TableScan { .. }
@@ -477,9 +489,9 @@ impl LogicalPlan {
LogicalPlan::Filter(Filter { input, .. }) => vec![input],
LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
LogicalPlan::Window(Window { input, .. }) => vec![input],
- LogicalPlan::Aggregate { input, .. } => vec![input],
- LogicalPlan::Sort { input, .. } => vec![input],
- LogicalPlan::Join { left, right, .. } => vec![left, right],
+ LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
+ LogicalPlan::Sort(Sort { input, .. }) => vec![input],
+ LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) =>
vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
@@ -508,11 +520,11 @@ impl LogicalPlan {
type Error = DataFusionError;
fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool,
Self::Error> {
- if let LogicalPlan::Join {
+ if let LogicalPlan::Join(Join {
join_constraint: JoinConstraint::Using,
on,
..
- } = plan
+ }) = plan
{
self.using_columns.push(
on.iter()
@@ -614,9 +626,9 @@ impl LogicalPlan {
input.accept(visitor)?
}
LogicalPlan::Window(Window { input, .. }) =>
input.accept(visitor)?,
- LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
- LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
- LogicalPlan::Join { left, right, .. }
+ LogicalPlan::Aggregate(Aggregate { input, .. }) =>
input.accept(visitor)?,
+ LogicalPlan::Sort(Sort { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Join(Join { left, right, .. })
| LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
left.accept(visitor)? && right.accept(visitor)?
}
@@ -900,16 +912,16 @@ impl LogicalPlan {
}) => {
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
}
- LogicalPlan::Aggregate {
+ LogicalPlan::Aggregate(Aggregate {
ref group_expr,
ref aggr_expr,
..
- } => write!(
+ }) => write!(
f,
"Aggregate: groupBy=[{:?}], aggr=[{:?}]",
group_expr, aggr_expr
),
- LogicalPlan::Sort { ref expr, .. } => {
+ LogicalPlan::Sort(Sort { expr, .. }) => {
write!(f, "Sort: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
@@ -919,11 +931,11 @@ impl LogicalPlan {
}
Ok(())
}
- LogicalPlan::Join {
+ LogicalPlan::Join(Join {
on: ref keys,
join_constraint,
..
- } => {
+ }) => {
let join_expr: Vec<String> =
keys.iter().map(|(l, r)| format!("{} = {}", l,
r)).collect();
match join_constraint {
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index ced2c0c..233d112 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -21,8 +21,10 @@ use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Filter, Projection, Window};
use crate::logical_plan::{
- col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan,
- Recursion, RewriteRecursion,
+ col,
+ plan::{Aggregate, Sort},
+ DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan,
Recursion,
+ RewriteRecursion,
};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
@@ -150,12 +152,12 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
schema: schema.clone(),
}))
}
- LogicalPlan::Aggregate {
- input,
+ LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
+ input,
schema,
- } => {
+ }) => {
let group_arrays = to_arrays(group_expr, input, &mut expr_set)?;
let aggr_arrays = to_arrays(aggr_expr, input, &mut expr_set)?;
@@ -171,14 +173,14 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
let new_aggr_expr = new_expr.pop().unwrap();
let new_group_expr = new_expr.pop().unwrap();
- Ok(LogicalPlan::Aggregate {
+ Ok(LogicalPlan::Aggregate(Aggregate {
input: Arc::new(new_input),
group_expr: new_group_expr,
aggr_expr: new_aggr_expr,
schema: schema.clone(),
- })
+ }))
}
- LogicalPlan::Sort { expr, input } => {
+ LogicalPlan::Sort(Sort { expr, input }) => {
let arrays = to_arrays(expr, input, &mut expr_set)?;
let (mut new_expr, new_input) = rewrite_expr(
@@ -190,10 +192,10 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
execution_props,
)?;
- Ok(LogicalPlan::Sort {
+ Ok(LogicalPlan::Sort(Sort {
expr: new_expr.pop().unwrap(),
input: Arc::new(new_input),
- })
+ }))
}
LogicalPlan::Join { .. }
| LogicalPlan::CrossJoin(_)
diff --git a/datafusion/src/optimizer/filter_push_down.rs
b/datafusion/src/optimizer/filter_push_down.rs
index 4214a57..2c70297 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,7 +16,7 @@
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{Filter, Projection};
+use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection};
use crate::logical_plan::{
and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan,
};
@@ -354,9 +354,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
utils::from_plan(plan, expr, &[new_input])
}
- LogicalPlan::Aggregate {
- input, aggr_expr, ..
- } => {
+ LogicalPlan::Aggregate(Aggregate {
+ aggr_expr, input, ..
+ }) => {
// An aggregate's aggreagate columns are _not_ filter-commutable
=> collect these:
// * columns whose aggregation expression depends on
// * the aggregation columns themselves
@@ -394,9 +394,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
optimize_join(state, plan, left, right)
}
- LogicalPlan::Join {
+ LogicalPlan::Join(Join {
left, right, on, ..
- } => {
+ }) => {
// duplicate filters for joined columns so filters can be pushed
down to both sides.
// Take the following query as an example:
//
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index 5b1f3ea..1b331ba 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -20,7 +20,9 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{AnalyzePlan, Projection, TableScanPlan,
Window};
+use crate::logical_plan::plan::{
+ Aggregate, AnalyzePlan, Join, Projection, TableScanPlan, Window,
+};
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
LogicalPlanBuilder, ToDFSchema, Union,
@@ -190,7 +192,7 @@ fn optimize_plan(
}))
}
}
- LogicalPlan::Join {
+ LogicalPlan::Join(Join {
left,
right,
on,
@@ -198,7 +200,7 @@ fn optimize_plan(
join_constraint,
null_equals_null,
..
- } => {
+ }) => {
for (l, r) in on {
new_required_columns.insert(l.clone());
new_required_columns.insert(r.clone());
@@ -226,7 +228,7 @@ fn optimize_plan(
join_type,
)?;
- Ok(LogicalPlan::Join {
+ Ok(LogicalPlan::Join(Join {
left: optimized_left,
right: optimized_right,
join_type: *join_type,
@@ -234,7 +236,7 @@ fn optimize_plan(
on: on.clone(),
schema: DFSchemaRef::new(schema),
null_equals_null: *null_equals_null,
- })
+ }))
}
LogicalPlan::Window(Window {
schema,
@@ -275,13 +277,12 @@ fn optimize_plan(
.window(new_window_expr)?
.build()
}
- LogicalPlan::Aggregate {
- schema,
- input,
+ LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
- ..
- } => {
+ schema,
+ input,
+ }) => {
// aggregate:
// * remove any aggregate expression that is not required
// * construct the new set of required columns
@@ -314,7 +315,7 @@ fn optimize_plan(
.collect(),
)?;
- Ok(LogicalPlan::Aggregate {
+ Ok(LogicalPlan::Aggregate(Aggregate {
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
input: Arc::new(optimize_plan(
@@ -325,7 +326,7 @@ fn optimize_plan(
execution_props,
)?),
schema: DFSchemaRef::new(new_schema),
- })
+ }))
}
// scans:
// * remove un-used columns from the scan projection
diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs
b/datafusion/src/optimizer/single_distinct_to_groupby.rs
index 358444d..3232fa0 100644
--- a/datafusion/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs
@@ -19,7 +19,7 @@
use crate::error::Result;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::Projection;
+use crate::logical_plan::plan::{Aggregate, Projection};
use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
@@ -51,12 +51,12 @@ impl SingleDistinctToGroupBy {
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
- LogicalPlan::Aggregate {
+ LogicalPlan::Aggregate(Aggregate {
input,
aggr_expr,
schema,
group_expr,
- } => {
+ }) => {
if is_single_distinct_agg(plan) {
let mut group_fields_set = HashSet::new();
let mut all_group_args = group_expr.clone();
@@ -87,12 +87,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
.collect::<Vec<_>>();
let grouped_schema = DFSchema::new(all_field).unwrap();
- let grouped_agg = LogicalPlan::Aggregate {
+ let grouped_agg = LogicalPlan::Aggregate(Aggregate {
input: input.clone(),
group_expr: all_group_args,
aggr_expr: Vec::new(),
schema: Arc::new(grouped_schema.clone()),
- };
+ });
let grouped_agg = optimize_children(&grouped_agg);
let final_agg_schema = Arc::new(
DFSchema::new(
@@ -105,12 +105,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
.unwrap(),
);
- let final_agg = LogicalPlan::Aggregate {
+ let final_agg = LogicalPlan::Aggregate(Aggregate {
input: Arc::new(grouped_agg.unwrap()),
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
schema: final_agg_schema.clone(),
- };
+ });
//so the aggregates are displayed in the same way even after
the rewrite
let mut alias_expr: Vec<Expr> = Vec::new();
@@ -151,9 +151,9 @@ fn optimize_children(plan: &LogicalPlan) ->
Result<LogicalPlan> {
fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
match plan {
- LogicalPlan::Aggregate {
+ LogicalPlan::Aggregate(Aggregate {
input, aggr_expr, ..
- } => {
+ }) => {
let mut fields_set = HashSet::new();
aggr_expr
.iter()
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index aa559cd..60a81f0 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -23,7 +23,9 @@ use arrow::record_batch::RecordBatch;
use super::optimizer::OptimizerRule;
use crate::execution::context::{ExecutionContextState, ExecutionProps};
-use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan, Filter,
Projection, Window};
+use crate::logical_plan::plan::{
+ Aggregate, AnalyzePlan, ExtensionPlan, Filter, Join, Projection, Sort,
Window,
+};
use crate::logical_plan::{
build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator,
Partitioning,
@@ -188,28 +190,28 @@ pub fn from_plan(
window_expr: expr[0..window_expr.len()].to_vec(),
schema: schema.clone(),
})),
- LogicalPlan::Aggregate {
+ LogicalPlan::Aggregate(Aggregate {
group_expr, schema, ..
- } => Ok(LogicalPlan::Aggregate {
+ }) => Ok(LogicalPlan::Aggregate(Aggregate {
group_expr: expr[0..group_expr.len()].to_vec(),
aggr_expr: expr[group_expr.len()..].to_vec(),
input: Arc::new(inputs[0].clone()),
schema: schema.clone(),
- }),
- LogicalPlan::Sort { .. } => Ok(LogicalPlan::Sort {
+ })),
+ LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort {
expr: expr.to_vec(),
input: Arc::new(inputs[0].clone()),
- }),
- LogicalPlan::Join {
+ })),
+ LogicalPlan::Join(Join {
join_type,
join_constraint,
on,
null_equals_null,
..
- } => {
+ }) => {
let schema =
build_join_schema(inputs[0].schema(), inputs[1].schema(),
join_type)?;
- Ok(LogicalPlan::Join {
+ Ok(LogicalPlan::Join(Join {
left: Arc::new(inputs[0].clone()),
right: Arc::new(inputs[1].clone()),
join_type: *join_type,
@@ -217,7 +219,7 @@ pub fn from_plan(
on: on.clone(),
schema: DFSchemaRef::new(schema),
null_equals_null: *null_equals_null,
- })
+ }))
}
LogicalPlan::CrossJoin(_) => {
let left = inputs[0].clone();
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index a7ba64a..f2f526d 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,13 +23,15 @@ use super::{
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec,
windows,
};
use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::plan::{EmptyRelation, Filter, Projection, Window};
+use crate::logical_plan::plan::{
+ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScanPlan,
Window,
+};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
Operator,
Partitioning as LogicalPartitioning, PlanType, Repartition,
ToStringifiedPlan, Union,
UserDefinedLogicalNode,
};
-use crate::logical_plan::{Limit, TableScanPlan, Values};
+use crate::logical_plan::{Limit, Values};
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
@@ -479,12 +481,12 @@ impl DefaultPhysicalPlanner {
physical_input_schema,
)?) )
}
- LogicalPlan::Aggregate {
+ LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
..
- } => {
+ }) => {
// Initially need to perform the aggregate and then merge
the partitions
let input_exec = self.create_initial_plan(input,
ctx_state).await?;
let physical_input_schema = input_exec.schema();
@@ -676,7 +678,7 @@ impl DefaultPhysicalPlanner {
physical_partitioning,
)?) )
}
- LogicalPlan::Sort { expr, input, .. } => {
+ LogicalPlan::Sort(Sort { expr, input, .. }) => {
let physical_input = self.create_initial_plan(input,
ctx_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = input.as_ref().schema();
@@ -704,14 +706,14 @@ impl DefaultPhysicalPlanner {
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?)
)
}
- LogicalPlan::Join {
+ LogicalPlan::Join(Join {
left,
right,
on: keys,
join_type,
null_equals_null,
..
- } => {
+ }) => {
let left_df_schema = left.schema();
let physical_left = self.create_initial_plan(left,
ctx_state).await?;
let right_df_schema = right.schema();
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 5ea008c..809018b 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,7 +36,7 @@ use datafusion::assert_batches_eq;
use datafusion::assert_batches_sorted_eq;
use datafusion::assert_contains;
use datafusion::assert_not_contains;
-use datafusion::logical_plan::plan::Projection;
+use datafusion::logical_plan::plan::{Aggregate, Projection};
use datafusion::logical_plan::LogicalPlan;
use datafusion::logical_plan::TableScanPlan;
use datafusion::physical_plan::functions::Volatility;
@@ -92,7 +92,7 @@ async fn nyc() -> Result<()> {
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match
input.as_ref() {
- LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
+ LogicalPlan::Aggregate(Aggregate { input, .. }) => match
input.as_ref() {
LogicalPlan::TableScan(TableScanPlan {
ref projected_schema,
..
diff --git a/datafusion/tests/user_defined_plan.rs
b/datafusion/tests/user_defined_plan.rs
index a63ef2a..d931985 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -86,7 +86,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
use async_trait::async_trait;
use datafusion::execution::context::ExecutionProps;
-use datafusion::logical_plan::plan::ExtensionPlan;
+use datafusion::logical_plan::plan::{ExtensionPlan, Sort};
use datafusion::logical_plan::{DFSchemaRef, Limit};
/// Execute the specified sql and return the resulting record batches
@@ -289,10 +289,10 @@ impl OptimizerRule for TopKOptimizerRule {
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
if let LogicalPlan::Limit(Limit { ref n, ref input }) = plan {
- if let LogicalPlan::Sort {
+ if let LogicalPlan::Sort(Sort {
ref expr,
ref input,
- } = **input
+ }) = **input
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a
a TopK