This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1a0542acbc refactor: with_inputs() can use original schema to avoid
recompute schema. (#7069)
1a0542acbc is described below
commit 1a0542acbc01e5243471ae0fc3586c2f1f40013b
Author: jakevin <[email protected]>
AuthorDate: Tue Jul 25 10:24:00 2023 +0800
refactor: with_inputs() can use original schema to avoid recompute schema.
(#7069)
* minor: with_inputs() can use original schema.
* remove useless code
---
datafusion/core/src/physical_planner.rs | 5 ++-
datafusion/expr/src/logical_plan/builder.rs | 9 ++---
datafusion/expr/src/logical_plan/plan.rs | 54 +++++++++++++++++------------
3 files changed, 36 insertions(+), 32 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index c0e60008bc..913deebddf 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -69,7 +69,7 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols};
use
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
-use datafusion_expr::{logical_plan, DmlStatement, StringifiedPlan, WriteOp};
+use datafusion_expr::{DmlStatement, StringifiedPlan, WriteOp};
use datafusion_expr::{WindowFrame, WindowFrameBound};
use datafusion_physical_expr::expressions::Literal;
use datafusion_sql::utils::window_expr_common_partition_keys;
@@ -942,10 +942,9 @@ impl DefaultPhysicalPlanner {
})
.collect::<Vec<_>>();
let projection =
- logical_plan::Projection::try_new_with_schema(
+ Projection::try_new(
final_join_result,
Arc::new(join_plan),
- join_schema.clone(),
)?;
LogicalPlan::Projection(projection)
} else {
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 9ddf6231c5..cd8940e134 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -24,7 +24,7 @@ use crate::expr_rewriter::{
rewrite_sort_cols_by_aggs,
};
use crate::type_coercion::binary::comparison_coercion;
-use crate::utils::{columnize_expr, compare_sort_expr, exprlist_to_fields};
+use crate::utils::{columnize_expr, compare_sort_expr};
use crate::{and, binary_expr, DmlStatement, Operator, WriteOp};
use crate::{
logical_plan::{
@@ -1239,15 +1239,10 @@ pub fn project(
}
}
validate_unique_names("Projections", projected_expr.iter())?;
- let input_schema = DFSchema::new_with_metadata(
- exprlist_to_fields(&projected_expr, &plan)?,
- plan.schema().metadata().clone(),
- )?;
- Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ Ok(LogicalPlan::Projection(Projection::try_new(
projected_expr,
Arc::new(plan.clone()),
- DFSchemaRef::new(input_schema),
)?))
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 0485973364..be8270ddc3 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -482,7 +482,38 @@ impl LogicalPlan {
}
pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) ->
Result<LogicalPlan> {
- from_plan(self, &self.expressions(), inputs)
+ // with_new_inputs use original expression,
+ // so we don't need to recompute Schema.
+ match &self {
+ LogicalPlan::Projection(projection) => {
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ projection.expr.to_vec(),
+ Arc::new(inputs[0].clone()),
+ projection.schema.clone(),
+ )?))
+ }
+ LogicalPlan::Window(Window {
+ window_expr,
+ schema,
+ ..
+ }) => Ok(LogicalPlan::Window(Window {
+ input: Arc::new(inputs[0].clone()),
+ window_expr: window_expr.to_vec(),
+ schema: schema.clone(),
+ })),
+ LogicalPlan::Aggregate(Aggregate {
+ group_expr,
+ aggr_expr,
+ schema,
+ ..
+ }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+ Arc::new(inputs[0].clone()),
+ group_expr.to_vec(),
+ aggr_expr.to_vec(),
+ schema.clone(),
+ )?)),
+ _ => from_plan(self, &self.expressions(), inputs),
+ }
}
/// Convert a prepared [`LogicalPlan`] into its inner logical plan
@@ -1290,13 +1321,6 @@ impl Projection {
schema,
}
}
-
- pub fn try_from_plan(plan: &LogicalPlan) -> Result<&Projection> {
- match plan {
- LogicalPlan::Projection(it) => Ok(it),
- _ => plan_err!("Could not coerce into Projection!"),
- }
- }
}
/// Aliased subquery
@@ -1375,13 +1399,6 @@ impl Filter {
Ok(Self { predicate, input })
}
-
- pub fn try_from_plan(plan: &LogicalPlan) -> Result<&Filter> {
- match plan {
- LogicalPlan::Filter(it) => Ok(it),
- _ => plan_err!("Could not coerce into Filter!"),
- }
- }
}
/// Window its input based on a set of window spec and window function (e.g.
SUM or RANK)
@@ -1632,13 +1649,6 @@ impl Aggregate {
schema,
})
}
-
- pub fn try_from_plan(plan: &LogicalPlan) -> Result<&Aggregate> {
- match plan {
- LogicalPlan::Aggregate(it) => Ok(it),
- _ => plan_err!("Could not coerce into Aggregate!"),
- }
- }
}
/// Sorts its input according to a list of sort expressions.