This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b64d2c6 Extract CrossJoin, Repartition, Union in LogicalPlan (#1322)
b64d2c6 is described below
commit b64d2c62669a89c2d7a913c5f40598c07ab3bece
Author: Kun Liu <[email protected]>
AuthorDate: Fri Nov 19 02:10:56 2021 +0800
Extract CrossJoin, Repartition, Union in LogicalPlan (#1322)
---
ballista/rust/core/src/serde/logical_plan/mod.rs | 13 +--
.../rust/core/src/serde/logical_plan/to_proto.rs | 12 +--
datafusion/src/logical_plan/builder.rs | 20 ++---
datafusion/src/logical_plan/mod.rs | 5 +-
datafusion/src/logical_plan/plan.rs | 100 ++++++++++++---------
.../src/optimizer/common_subexpr_eliminate.rs | 6 +-
datafusion/src/optimizer/constant_folding.rs | 6 +-
datafusion/src/optimizer/filter_push_down.rs | 8 +-
datafusion/src/optimizer/limit_push_down.rs | 10 +--
datafusion/src/optimizer/projection_push_down.rs | 14 +--
datafusion/src/optimizer/utils.rs | 34 +++----
datafusion/src/physical_plan/planner.rs | 12 +--
12 files changed, 131 insertions(+), 109 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 472399b..a5e2aa0 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -24,6 +24,7 @@ mod roundtrip_tests {
use super::super::{super::error::Result, protobuf};
use crate::error::BallistaError;
use core::panic;
+ use datafusion::logical_plan::Repartition;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
datasource::object_store::local::LocalFileSystem,
@@ -93,28 +94,28 @@ mod roundtrip_tests {
for batch_size in test_batch_sizes.iter() {
let rr_repartition = Partitioning::RoundRobinBatch(*batch_size);
- let roundtrip_plan = LogicalPlan::Repartition {
+ let roundtrip_plan = LogicalPlan::Repartition(Repartition {
input: plan.clone(),
partitioning_scheme: rr_repartition,
- };
+ });
roundtrip_test!(roundtrip_plan);
let h_repartition = Partitioning::Hash(test_expr.clone(),
*batch_size);
- let roundtrip_plan = LogicalPlan::Repartition {
+ let roundtrip_plan = LogicalPlan::Repartition(Repartition {
input: plan.clone(),
partitioning_scheme: h_repartition,
- };
+ });
roundtrip_test!(roundtrip_plan);
let no_expr_hrepartition = Partitioning::Hash(Vec::new(),
*batch_size);
- let roundtrip_plan = LogicalPlan::Repartition {
+ let roundtrip_plan = LogicalPlan::Repartition(Repartition {
input: plan.clone(),
partitioning_scheme: no_expr_hrepartition,
- };
+ });
roundtrip_test!(roundtrip_plan);
}
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 825a4b9..a870ecd 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -33,8 +33,8 @@ use datafusion::datasource::listing::ListingTable;
use datafusion::logical_plan::{
exprlist_to_fields,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
- Column, CreateExternalTable, Expr, JoinConstraint, JoinType, LogicalPlan,
- TableScanPlan,
+ Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType,
LogicalPlan,
+ Repartition, TableScanPlan,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
@@ -901,10 +901,10 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
- LogicalPlan::Repartition {
+ LogicalPlan::Repartition(Repartition {
input,
partitioning_scheme,
- } => {
+ }) => {
use datafusion::logical_plan::Partitioning;
let input: protobuf::LogicalPlanNode =
input.as_ref().try_into()?;
@@ -996,8 +996,8 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
})
}
LogicalPlan::Extension { .. } => unimplemented!(),
- LogicalPlan::Union { .. } => unimplemented!(),
- LogicalPlan::CrossJoin { left, right, .. } => {
+ LogicalPlan::Union(_) => unimplemented!(),
+ LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let left: protobuf::LogicalPlanNode =
left.as_ref().try_into()?;
let right: protobuf::LogicalPlanNode =
right.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index 64b6c25..8c30579 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -26,7 +26,7 @@ use crate::datasource::{
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{
- AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan,
+ AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union,
};
use crate::prelude::*;
use crate::scalar::ScalarValue;
@@ -44,8 +44,8 @@ use std::{
use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan,
PlanType};
use crate::logical_plan::{
- columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
- DFSchemaRef, Partitioning,
+ columnize_expr, normalize_col, normalize_cols, Column, CrossJoin, DFField,
DFSchema,
+ DFSchemaRef, Partitioning, Repartition,
};
use crate::sql::utils::group_window_expr_by_sort_keys;
@@ -632,19 +632,19 @@ impl LogicalPlanBuilder {
/// Apply a cross join
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
let schema = self.plan.schema().join(right.schema())?;
- Ok(Self::from(LogicalPlan::CrossJoin {
+ Ok(Self::from(LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
schema: DFSchemaRef::new(schema),
- }))
+ })))
}
/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) ->
Result<Self> {
- Ok(Self::from(LogicalPlan::Repartition {
+ Ok(Self::from(LogicalPlan::Repartition(Repartition {
input: Arc::new(self.plan.clone()),
partitioning_scheme,
- }))
+ })))
}
/// Apply a window functions to extend the schema
@@ -840,7 +840,7 @@ pub fn union_with_alias(
let inputs = vec![left_plan, right_plan]
.into_iter()
.flat_map(|p| match p {
- LogicalPlan::Union { inputs, .. } => inputs,
+ LogicalPlan::Union(Union { inputs, .. }) => inputs,
x => vec![x],
})
.collect::<Vec<_>>();
@@ -863,11 +863,11 @@ pub fn union_with_alias(
));
}
- Ok(LogicalPlan::Union {
+ Ok(LogicalPlan::Union(Union {
schema: union_schema,
inputs,
alias,
- })
+ }))
}
/// Project with optional alias
diff --git a/datafusion/src/logical_plan/mod.rs
b/datafusion/src/logical_plan/mod.rs
index 77193eb..bb89542 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -51,8 +51,9 @@ pub use expr::{
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
- CreateExternalTable, CreateMemoryTable, DropTable, JoinConstraint,
JoinType,
- LogicalPlan, Partitioning, PlanType, PlanVisitor, TableScanPlan,
+ CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable,
JoinConstraint,
+ JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
+ TableScanPlan, Union,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index ea56bc0..c12edd9 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -74,6 +74,37 @@ pub struct TableScanPlan {
pub limit: Option<usize>,
}
+/// Apply Cross Join to two logical plans
+#[derive(Clone)]
+pub struct CrossJoin {
+ /// Left input
+ pub left: Arc<LogicalPlan>,
+ /// Right input
+ pub right: Arc<LogicalPlan>,
+ /// The output schema, containing fields from the left and right inputs
+ pub schema: DFSchemaRef,
+}
+
+/// Repartition the plan based on a partitioning scheme.
+#[derive(Clone)]
+pub struct Repartition {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The partitioning scheme
+ pub partitioning_scheme: Partitioning,
+}
+
+/// Union multiple inputs
+#[derive(Clone)]
+pub struct Union {
+ /// Inputs to merge
+ pub inputs: Vec<LogicalPlan>,
+ /// Union schema. Should be the same for all inputs.
+ pub schema: DFSchemaRef,
+ /// Union output relation alias
+ pub alias: Option<String>,
+}
+
/// Creates an in memory table.
#[derive(Clone)]
pub struct CreateMemoryTable {
@@ -224,30 +255,11 @@ pub enum LogicalPlan {
null_equals_null: bool,
},
/// Apply Cross Join to two logical plans
- CrossJoin {
- /// Left input
- left: Arc<LogicalPlan>,
- /// Right input
- right: Arc<LogicalPlan>,
- /// The output schema, containing fields from the left and right inputs
- schema: DFSchemaRef,
- },
+ CrossJoin(CrossJoin),
/// Repartition the plan based on a partitioning scheme.
- Repartition {
- /// The incoming logical plan
- input: Arc<LogicalPlan>,
- /// The partitioning scheme
- partitioning_scheme: Partitioning,
- },
+ Repartition(Repartition),
/// Union multiple inputs
- Union {
- /// Inputs to merge
- inputs: Vec<LogicalPlan>,
- /// Union schema. Should be the same for all inputs.
- schema: DFSchemaRef,
- /// Union output relation alias
- alias: Option<String>,
- },
+ Union(Union),
/// Produces rows from a table provider by reference or from the context
TableScan(TableScanPlan),
/// Produces no rows: An empty relation with an empty schema
@@ -304,8 +316,8 @@ impl LogicalPlan {
LogicalPlan::Aggregate { schema, .. } => schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => schema,
- LogicalPlan::CrossJoin { schema, .. } => schema,
- LogicalPlan::Repartition { input, .. } => input.schema(),
+ LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
+ LogicalPlan::Repartition(Repartition { input, .. }) =>
input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, ..
}) => {
schema
@@ -313,7 +325,7 @@ impl LogicalPlan {
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
- LogicalPlan::Union { schema, .. } => schema,
+ LogicalPlan::Union(Union { schema, .. }) => schema,
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) =>
{
input.schema()
}
@@ -341,17 +353,17 @@ impl LogicalPlan {
schema,
..
}
- | LogicalPlan::CrossJoin {
+ | LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema,
- } => {
+ }) => {
let mut schemas = left.all_schemas();
schemas.extend(right.all_schemas());
schemas.insert(0, schema);
schemas
}
- LogicalPlan::Union { schema, .. } => {
+ LogicalPlan::Union(Union { schema, .. }) => {
vec![schema]
}
LogicalPlan::Extension(extension) => vec![extension.node.schema()],
@@ -362,7 +374,7 @@ impl LogicalPlan {
vec![schema]
}
LogicalPlan::Limit { input, .. }
- | LogicalPlan::Repartition { input, .. }
+ | LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
@@ -388,10 +400,10 @@ impl LogicalPlan {
values.iter().flatten().cloned().collect()
}
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
- LogicalPlan::Repartition {
+ LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
- } => match partitioning_scheme {
+ }) => match partitioning_scheme {
Partitioning::Hash(expr, _) => expr.clone(),
_ => vec![],
},
@@ -414,10 +426,10 @@ impl LogicalPlan {
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
- | LogicalPlan::CrossJoin { .. }
+ | LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
- | LogicalPlan::Union { .. } => {
+ | LogicalPlan::Union(_) => {
vec![]
}
}
@@ -429,15 +441,15 @@ impl LogicalPlan {
match self {
LogicalPlan::Projection { input, .. } => vec![input],
LogicalPlan::Filter { input, .. } => vec![input],
- LogicalPlan::Repartition { input, .. } => vec![input],
+ LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
LogicalPlan::Window { input, .. } => vec![input],
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
- LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
+ LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) =>
vec![left, right],
LogicalPlan::Limit { input, .. } => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
- LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
+ LogicalPlan::Union(Union { inputs, .. }) =>
inputs.iter().collect(),
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) =>
{
@@ -564,15 +576,17 @@ impl LogicalPlan {
let recurse = match self {
LogicalPlan::Projection { input, .. } => input.accept(visitor)?,
LogicalPlan::Filter { input, .. } => input.accept(visitor)?,
- LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
+ LogicalPlan::Repartition(Repartition { input, .. }) => {
+ input.accept(visitor)?
+ }
LogicalPlan::Window { input, .. } => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
LogicalPlan::Join { left, right, .. }
- | LogicalPlan::CrossJoin { left, right, .. } => {
+ | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
left.accept(visitor)? && right.accept(visitor)?
}
- LogicalPlan::Union { inputs, .. } => {
+ LogicalPlan::Union(Union { inputs, .. }) => {
for input in inputs {
if !input.accept(visitor)? {
return Ok(false);
@@ -887,13 +901,13 @@ impl LogicalPlan {
}
}
}
- LogicalPlan::CrossJoin { .. } => {
+ LogicalPlan::CrossJoin(_) => {
write!(f, "CrossJoin:")
}
- LogicalPlan::Repartition {
+ LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
- } => match partitioning_scheme {
+ }) => match partitioning_scheme {
Partitioning::RoundRobinBatch(n) => write!(
f,
"Repartition: RoundRobinBatch partition_count={}",
@@ -927,7 +941,7 @@ impl LogicalPlan {
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
- LogicalPlan::Union { .. } => write!(f, "Union"),
+ LogicalPlan::Union(_) => write!(f, "Union"),
LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
}
}
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 9e7a9dd..1ab87e7 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -195,9 +195,9 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
})
}
LogicalPlan::Join { .. }
- | LogicalPlan::CrossJoin { .. }
- | LogicalPlan::Repartition { .. }
- | LogicalPlan::Union { .. }
+ | LogicalPlan::CrossJoin(_)
+ | LogicalPlan::Repartition(_)
+ | LogicalPlan::Union(_)
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::EmptyRelation { .. }
diff --git a/datafusion/src/optimizer/constant_folding.rs
b/datafusion/src/optimizer/constant_folding.rs
index 06bac6c..ff1e16c 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -68,7 +68,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Projection { .. }
| LogicalPlan::Window { .. }
| LogicalPlan::Aggregate { .. }
- | LogicalPlan::Repartition { .. }
+ | LogicalPlan::Repartition(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
@@ -78,9 +78,9 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Limit { .. }
- | LogicalPlan::Union { .. }
+ | LogicalPlan::Union(_)
| LogicalPlan::Join { .. }
- | LogicalPlan::CrossJoin { .. } => {
+ | LogicalPlan::CrossJoin(_) => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
diff --git a/datafusion/src/optimizer/filter_push_down.rs
b/datafusion/src/optimizer/filter_push_down.rs
index 1bb1a18..cc8f011 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,7 +16,9 @@
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::{and, replace_col, Column, LogicalPlan,
TableScanPlan};
+use crate::logical_plan::{
+ and, replace_col, Column, CrossJoin, LogicalPlan, TableScanPlan,
+};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
@@ -374,7 +376,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
// sort is filter-commutable
push_down(&state, plan)
}
- LogicalPlan::Union { .. } => {
+ LogicalPlan::Union(_) => {
// union all is filter-commutable
push_down(&state, plan)
}
@@ -388,7 +390,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
.collect::<HashSet<_>>();
issue_filters(state, used_columns, plan)
}
- LogicalPlan::CrossJoin { left, right, .. } => {
+ LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
optimize_join(state, plan, left, right)
}
LogicalPlan::Join {
diff --git a/datafusion/src/optimizer/limit_push_down.rs
b/datafusion/src/optimizer/limit_push_down.rs
index 842ca08..ce8b5bc 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,8 +20,8 @@
use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::LogicalPlan;
use crate::logical_plan::TableScanPlan;
+use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
use std::sync::Arc;
@@ -99,11 +99,11 @@ fn limit_push_down(
})
}
(
- LogicalPlan::Union {
+ LogicalPlan::Union(Union {
inputs,
alias,
schema,
- },
+ }),
Some(upper_limit),
) => {
// Push down limit through UNION
@@ -121,11 +121,11 @@ fn limit_push_down(
})
})
.collect::<Result<_>>()?;
- Ok(LogicalPlan::Union {
+ Ok(LogicalPlan::Union(Union {
inputs: new_inputs,
alias: alias.clone(),
schema: schema.clone(),
- })
+ }))
}
// For other nodes we can't push down the limit
// But try to recurse and find other limit nodes to push down
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index f3a7c61..b725b1d 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -23,7 +23,7 @@ use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{AnalyzePlan, TableScanPlan};
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
- LogicalPlanBuilder, ToDFSchema,
+ LogicalPlanBuilder, ToDFSchema, Union,
};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
@@ -377,11 +377,11 @@ fn optimize_plan(
schema: a.schema.clone(),
}))
}
- LogicalPlan::Union {
+ LogicalPlan::Union(Union {
inputs,
schema,
alias,
- } => {
+ }) => {
// UNION inputs will reference the same column with different
identifiers, so we need
// to populate new_required_columns by unqualified column name
based on required fields
// from the resulting UNION output
@@ -419,24 +419,24 @@ fn optimize_plan(
.cloned()
.collect(),
)?;
- Ok(LogicalPlan::Union {
+ Ok(LogicalPlan::Union(Union {
inputs: new_inputs,
schema: Arc::new(new_schema),
alias: alias.clone(),
- })
+ }))
}
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
LogicalPlan::Limit { .. }
| LogicalPlan::Filter { .. }
- | LogicalPlan::Repartition { .. }
+ | LogicalPlan::Repartition(_)
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
- | LogicalPlan::CrossJoin { .. }
+ | LogicalPlan::CrossJoin(_)
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index fe14442..b9a3b99 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -27,7 +27,7 @@ use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan};
use crate::logical_plan::{
build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
ExprRewriter, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning,
Recursion,
- RewriteRecursion,
+ Repartition, RewriteRecursion, Union,
};
use crate::physical_plan::functions::Volatility;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -162,18 +162,20 @@ pub fn from_plan(
predicate: expr[0].clone(),
input: Arc::new(inputs[0].clone()),
}),
- LogicalPlan::Repartition {
+ LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
- } => match partitioning_scheme {
- Partitioning::RoundRobinBatch(n) => Ok(LogicalPlan::Repartition {
- partitioning_scheme: Partitioning::RoundRobinBatch(*n),
- input: Arc::new(inputs[0].clone()),
- }),
- Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition {
+ }) => match partitioning_scheme {
+ Partitioning::RoundRobinBatch(n) => {
+ Ok(LogicalPlan::Repartition(Repartition {
+ partitioning_scheme: Partitioning::RoundRobinBatch(*n),
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ Partitioning::Hash(_, n) =>
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
input: Arc::new(inputs[0].clone()),
- }),
+ })),
},
LogicalPlan::Window {
window_expr,
@@ -215,7 +217,7 @@ pub fn from_plan(
null_equals_null: *null_equals_null,
})
}
- LogicalPlan::CrossJoin { .. } => {
+ LogicalPlan::CrossJoin(_) => {
let left = inputs[0].clone();
let right = &inputs[1];
LogicalPlanBuilder::from(left).cross_join(right)?.build()
@@ -233,11 +235,13 @@ pub fn from_plan(
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(ExtensionPlan {
node: e.node.from_template(expr, inputs),
})),
- LogicalPlan::Union { schema, alias, .. } => Ok(LogicalPlan::Union {
- inputs: inputs.to_vec(),
- schema: schema.clone(),
- alias: alias.clone(),
- }),
+ LogicalPlan::Union(Union { schema, alias, .. }) => {
+ Ok(LogicalPlan::Union(Union {
+ inputs: inputs.to_vec(),
+ schema: schema.clone(),
+ alias: alias.clone(),
+ }))
+ }
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
assert_eq!(inputs.len(), 1);
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index 25ed0e4..ad858be 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -25,8 +25,8 @@ use super::{
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::TableScanPlan;
use crate::logical_plan::{
- unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
- Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan,
+ unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
+ Partitioning as LogicalPartitioning, PlanType, Repartition,
ToStringifiedPlan, Union,
UserDefinedLogicalNode,
};
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
@@ -643,17 +643,17 @@ impl DefaultPhysicalPlanner {
)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr,
physical_input)?) )
}
- LogicalPlan::Union { inputs, .. } => {
+ LogicalPlan::Union(Union { inputs, .. }) => {
let physical_plans = futures::stream::iter(inputs)
.then(|lp| self.create_initial_plan(lp, ctx_state))
.try_collect::<Vec<_>>()
.await?;
Ok(Arc::new(UnionExec::new(physical_plans)) )
}
- LogicalPlan::Repartition {
+ LogicalPlan::Repartition(Repartition {
input,
partitioning_scheme,
- } => {
+ }) => {
let physical_input = self.create_initial_plan(input,
ctx_state).await?;
let input_schema = physical_input.schema();
let input_dfschema = input.as_ref().schema();
@@ -776,7 +776,7 @@ impl DefaultPhysicalPlanner {
)?))
}
}
- LogicalPlan::CrossJoin { left, right, .. } => {
+ LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let left = self.create_initial_plan(left,
ctx_state).await?;
let right = self.create_initial_plan(right,
ctx_state).await?;
Ok(Arc::new(CrossJoinExec::try_new(left, right)?))