This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 18aef7c165 Make `LogicalPlan::with_new_exprs,` deprecate `from_plan`
(#7396)
18aef7c165 is described below
commit 18aef7c1650c64f880fe745a04233125a7fc8770
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Sep 6 12:25:05 2023 -0400
Make `LogicalPlan::with_new_exprs,` deprecate `from_plan` (#7396)
* Make `LogicalPlan::with_new_exprs`, deprecate `from_plan`
* Avoid cloning
* fix docs
---
datafusion/expr/src/logical_plan/plan.rs | 387 ++++++++++++++++++++-
datafusion/expr/src/utils.rs | 340 +-----------------
datafusion/optimizer/src/analyzer/type_coercion.rs | 5 +-
datafusion/optimizer/src/push_down_filter.rs | 6 +-
.../src/simplify_expressions/simplify_exprs.rs | 4 +-
.../optimizer/src/unwrap_cast_in_comparison.rs | 7 +-
6 files changed, 393 insertions(+), 356 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 083ee230c7..9f2d90accc 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -20,29 +20,32 @@
use crate::dml::CopyOptions;
use crate::expr::{Alias, Exists, InSubquery, Placeholder};
use crate::expr_rewriter::create_col_from_scalar_expr;
-use crate::expr_vec_fmt;
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::logical_plan::{DmlStatement, Statement};
use crate::utils::{
- enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
from_plan,
+ enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
grouping_set_expr_count, grouping_set_to_exprlist, inspect_expr_pre,
};
use crate::{
build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown,
TableSource,
};
+use crate::{
+ expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView,
LogicalPlanBuilder, Operator,
+};
use super::dml::CopyTo;
use super::DdlStatement;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::{
- Transformed, TreeNode, TreeNodeVisitor, VisitRecursion,
+ RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, TreeNodeVisitor,
+ VisitRecursion,
};
use datafusion_common::{
- aggregate_functional_dependencies, internal_err, plan_err, Column,
DFField, DFSchema,
- DFSchemaRef, DataFusionError, FunctionalDependencies, OwnedTableReference,
Result,
- ScalarValue, UnnestOptions,
+ aggregate_functional_dependencies, internal_err, plan_err, Column,
Constraints,
+ DFField, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies,
+ OwnedTableReference, Result, ScalarValue, UnnestOptions,
};
// backwards compatibility
pub use datafusion_common::display::{PlanType, StringifiedPlan,
ToStringifiedPlan};
@@ -513,6 +516,7 @@ impl LogicalPlan {
}
}
+ /// Returns a copy of this `LogicalPlan` with the new inputs
pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) ->
Result<LogicalPlan> {
// with_new_inputs use original expression,
// so we don't need to recompute Schema.
@@ -544,10 +548,377 @@ impl LogicalPlan {
aggr_expr.to_vec(),
schema.clone(),
)?)),
- _ => from_plan(self, &self.expressions(), inputs),
+ _ => self.with_new_exprs(self.expressions(), inputs),
}
}
+ /// Returns a new `LogicalPlan` based on `self` with inputs and
+ /// expressions replaced.
+ ///
+ /// The exprs correspond to the same order of expressions returned
+ /// by [`Self::expressions`]. This function is used by optimizers
+ /// to rewrite plans using the following pattern:
+ ///
+ /// ```text
+ /// let new_inputs = optimize_children(..., plan, props);
+ ///
+ /// // get the plans expressions to optimize
+ /// let exprs = plan.expressions();
+ ///
+ /// // potentially rewrite plan expressions
+ /// let rewritten_exprs = rewrite_exprs(exprs);
+ ///
+ /// // create new plan using rewritten_exprs in same position
+ /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
+ /// ```
+ ///
+ /// Note: sometimes [`Self::with_new_exprs`] will use schema of
+ /// original plan, it will not change the scheam. Such as
+ /// `Projection/Aggregate/Window`
+ pub fn with_new_exprs(
+ &self,
+ mut expr: Vec<Expr>,
+ inputs: &[LogicalPlan],
+ ) -> Result<LogicalPlan> {
+ match self {
+ LogicalPlan::Projection(Projection { schema, .. }) => {
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ expr,
+ Arc::new(inputs[0].clone()),
+ schema.clone(),
+ )?))
+ }
+ LogicalPlan::Dml(DmlStatement {
+ table_name,
+ table_schema,
+ op,
+ ..
+ }) => Ok(LogicalPlan::Dml(DmlStatement {
+ table_name: table_name.clone(),
+ table_schema: table_schema.clone(),
+ op: op.clone(),
+ input: Arc::new(inputs[0].clone()),
+ })),
+ LogicalPlan::Copy(CopyTo {
+ input: _,
+ output_url,
+ file_format,
+ copy_options,
+ single_file_output,
+ }) => Ok(LogicalPlan::Copy(CopyTo {
+ input: Arc::new(inputs[0].clone()),
+ output_url: output_url.clone(),
+ file_format: file_format.clone(),
+ single_file_output: *single_file_output,
+ copy_options: copy_options.clone(),
+ })),
+ LogicalPlan::Values(Values { schema, .. }) => {
+ Ok(LogicalPlan::Values(Values {
+ schema: schema.clone(),
+ values: expr
+ .chunks_exact(schema.fields().len())
+ .map(|s| s.to_vec())
+ .collect::<Vec<_>>(),
+ }))
+ }
+ LogicalPlan::Filter { .. } => {
+ assert_eq!(1, expr.len());
+ let predicate = expr.pop().unwrap();
+
+ // filter predicates should not contain aliased expressions so
we remove any aliases
+ // before this logic was added we would have aliases within
filters such as for
+ // benchmark q6:
+ //
+ // lineitem.l_shipdate >= Date32(\"8766\")
+ // AND lineitem.l_shipdate < Date32(\"9131\")
+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS
lineitem.l_discount >=
+ // Decimal128(Some(49999999999999),30,15)
+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS
lineitem.l_discount <=
+ // Decimal128(Some(69999999999999),30,15)
+ // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
+
+ struct RemoveAliases {}
+
+ impl TreeNodeRewriter for RemoveAliases {
+ type N = Expr;
+
+ fn pre_visit(&mut self, expr: &Expr) ->
Result<RewriteRecursion> {
+ match expr {
+ Expr::Exists { .. }
+ | Expr::ScalarSubquery(_)
+ | Expr::InSubquery(_) => {
+ // subqueries could contain aliases so we
don't recurse into those
+ Ok(RewriteRecursion::Stop)
+ }
+ Expr::Alias(_) => Ok(RewriteRecursion::Mutate),
+ _ => Ok(RewriteRecursion::Continue),
+ }
+ }
+
+ fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+ Ok(expr.unalias())
+ }
+ }
+
+ let mut remove_aliases = RemoveAliases {};
+ let predicate = predicate.rewrite(&mut remove_aliases)?;
+
+ Ok(LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new(inputs[0].clone()),
+ )?))
+ }
+ LogicalPlan::Repartition(Repartition {
+ partitioning_scheme,
+ ..
+ }) => match partitioning_scheme {
+ Partitioning::RoundRobinBatch(n) => {
+ Ok(LogicalPlan::Repartition(Repartition {
+ partitioning_scheme: Partitioning::RoundRobinBatch(*n),
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ Partitioning::Hash(_, n) =>
Ok(LogicalPlan::Repartition(Repartition {
+ partitioning_scheme: Partitioning::Hash(expr, *n),
+ input: Arc::new(inputs[0].clone()),
+ })),
+ Partitioning::DistributeBy(_) => {
+ Ok(LogicalPlan::Repartition(Repartition {
+ partitioning_scheme: Partitioning::DistributeBy(expr),
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ },
+ LogicalPlan::Window(Window {
+ window_expr,
+ schema,
+ ..
+ }) => {
+ assert_eq!(window_expr.len(), expr.len());
+ Ok(LogicalPlan::Window(Window {
+ input: Arc::new(inputs[0].clone()),
+ window_expr: expr,
+ schema: schema.clone(),
+ }))
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ group_expr, schema, ..
+ }) => {
+ // group exprs are the first expressions
+ let agg_expr = expr.split_off(group_expr.len());
+
+ Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+ Arc::new(inputs[0].clone()),
+ expr,
+ agg_expr,
+ schema.clone(),
+ )?))
+ }
+ LogicalPlan::Sort(Sort { fetch, .. }) => Ok(LogicalPlan::Sort(Sort
{
+ expr,
+ input: Arc::new(inputs[0].clone()),
+ fetch: *fetch,
+ })),
+ LogicalPlan::Join(Join {
+ join_type,
+ join_constraint,
+ on,
+ null_equals_null,
+ ..
+ }) => {
+ let schema =
+ build_join_schema(inputs[0].schema(), inputs[1].schema(),
join_type)?;
+
+ let equi_expr_count = on.len();
+ assert!(expr.len() >= equi_expr_count);
+
+ // Assume that the last expr, if any,
+ // is the filter_expr (non equality predicate from ON clause)
+ let filter_expr = if expr.len() > equi_expr_count {
+ expr.pop()
+ } else {
+ None
+ };
+
+ // The first part of expr is equi-exprs,
+ // and the struct of each equi-expr is like `left-expr =
right-expr`.
+ assert_eq!(expr.len(), equi_expr_count);
+ let new_on:Vec<(Expr,Expr)> = expr.into_iter().map(|equi_expr|
{
+ // SimplifyExpression rule may add alias to the equi_expr.
+ let unalias_expr = equi_expr.clone().unalias();
+ if let Expr::BinaryExpr(BinaryExpr { left, op:
Operator::Eq, right }) = unalias_expr {
+ Ok((*left, *right))
+ } else {
+ internal_err!(
+ "The front part expressions should be an binary
equality expression, actual:{equi_expr}"
+ )
+ }
+ }).collect::<Result<Vec<(Expr, Expr)>>>()?;
+
+ Ok(LogicalPlan::Join(Join {
+ left: Arc::new(inputs[0].clone()),
+ right: Arc::new(inputs[1].clone()),
+ join_type: *join_type,
+ join_constraint: *join_constraint,
+ on: new_on,
+ filter: filter_expr,
+ schema: DFSchemaRef::new(schema),
+ null_equals_null: *null_equals_null,
+ }))
+ }
+ LogicalPlan::CrossJoin(_) => {
+ let left = inputs[0].clone();
+ let right = inputs[1].clone();
+ LogicalPlanBuilder::from(left).cross_join(right)?.build()
+ }
+ LogicalPlan::Subquery(Subquery {
+ outer_ref_columns, ..
+ }) => {
+ let subquery =
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+ Ok(LogicalPlan::Subquery(Subquery {
+ subquery: Arc::new(subquery),
+ outer_ref_columns: outer_ref_columns.clone(),
+ }))
+ }
+ LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
+ Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+ inputs[0].clone(),
+ alias.clone(),
+ )?))
+ }
+ LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
+ Ok(LogicalPlan::Limit(Limit {
+ skip: *skip,
+ fetch: *fetch,
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable
{
+ name,
+ if_not_exists,
+ or_replace,
+ ..
+ })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
+ CreateMemoryTable {
+ input: Arc::new(inputs[0].clone()),
+ constraints: Constraints::empty(),
+ name: name.clone(),
+ if_not_exists: *if_not_exists,
+ or_replace: *or_replace,
+ },
+ ))),
+ LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
+ name,
+ or_replace,
+ definition,
+ ..
+ })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
+ input: Arc::new(inputs[0].clone()),
+ name: name.clone(),
+ or_replace: *or_replace,
+ definition: definition.clone(),
+ }))),
+ LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
+ node: e.node.from_template(&expr, inputs),
+ })),
+ LogicalPlan::Union(Union { schema, .. }) =>
Ok(LogicalPlan::Union(Union {
+ inputs: inputs.iter().cloned().map(Arc::new).collect(),
+ schema: schema.clone(),
+ })),
+ LogicalPlan::Distinct(Distinct { .. }) => {
+ Ok(LogicalPlan::Distinct(Distinct {
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ LogicalPlan::Analyze(a) => {
+ assert!(expr.is_empty());
+ assert_eq!(inputs.len(), 1);
+ Ok(LogicalPlan::Analyze(Analyze {
+ verbose: a.verbose,
+ schema: a.schema.clone(),
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ LogicalPlan::Explain(_) => {
+ // Explain should be handled specially in the optimizers;
+ // If this check cannot pass it means some optimizer pass is
+ // trying to optimize Explain directly
+ if expr.is_empty() {
+ return plan_err!("Invalid EXPLAIN command. Expression is
empty");
+ }
+
+ if inputs.is_empty() {
+ return plan_err!("Invalid EXPLAIN command. Inputs are
empty");
+ }
+
+ Ok(self.clone())
+ }
+ LogicalPlan::Prepare(Prepare {
+ name, data_types, ..
+ }) => Ok(LogicalPlan::Prepare(Prepare {
+ name: name.clone(),
+ data_types: data_types.clone(),
+ input: Arc::new(inputs[0].clone()),
+ })),
+ LogicalPlan::TableScan(ts) => {
+ assert!(inputs.is_empty(), "{self:?} should have no inputs");
+ Ok(LogicalPlan::TableScan(TableScan {
+ filters: expr,
+ ..ts.clone()
+ }))
+ }
+ LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Ddl(_)
+ | LogicalPlan::Statement(_) => {
+ // All of these plan types have no inputs / exprs so should
not be called
+ assert!(expr.is_empty(), "{self:?} should have no exprs");
+ assert!(inputs.is_empty(), "{self:?} should have no inputs");
+ Ok(self.clone())
+ }
+ LogicalPlan::DescribeTable(_) => Ok(self.clone()),
+ LogicalPlan::Unnest(Unnest {
+ column,
+ schema,
+ options,
+ ..
+ }) => {
+ // Update schema with unnested column type.
+ let input = Arc::new(inputs[0].clone());
+ let nested_field = input.schema().field_from_column(column)?;
+ let unnested_field = schema.field_from_column(column)?;
+ let fields = input
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| {
+ if f == nested_field {
+ unnested_field.clone()
+ } else {
+ f.clone()
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let schema = Arc::new(
+ DFSchema::new_with_metadata(
+ fields,
+ input.schema().metadata().clone(),
+ )?
+ // We can use the existing functional dependencies as is:
+ .with_functional_dependencies(
+ input.schema().functional_dependencies().clone(),
+ ),
+ );
+
+ Ok(LogicalPlan::Unnest(Unnest {
+ input,
+ column: column.clone(),
+ schema,
+ options: options.clone(),
+ }))
+ }
+ }
+ }
/// Convert a prepared [`LogicalPlan`] into its inner logical plan
/// with all params replaced with their corresponding values
pub fn with_param_values(
@@ -751,7 +1122,7 @@ impl LogicalPlan {
.map(|inp| inp.replace_params_with_values(param_values))
.collect::<Result<Vec<_>>>()?;
- from_plan(self, &new_exprs, &new_inputs_with_values)
+ self.with_new_exprs(new_exprs, &new_inputs_with_values)
}
/// Walk the logical plan, find any `PlaceHolder` tokens, and return a map
of their IDs and DataTypes
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index be48418cb8..cfe751d096 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -17,32 +17,19 @@
//! Expression utilities
-use crate::dml::CopyTo;
use crate::expr::{Alias, Sort, WindowFunction};
-use crate::logical_plan::builder::build_join_schema;
-use crate::logical_plan::{
- Aggregate, Analyze, Distinct, Extension, Filter, Join, Limit,
Partitioning, Prepare,
- Projection, Repartition, Sort as SortPlan, Subquery, SubqueryAlias, Union,
Unnest,
- Values, Window,
-};
+use crate::logical_plan::Aggregate;
use crate::signature::{Signature, TypeSignature};
-use crate::{
- BinaryExpr, Cast, CreateMemoryTable, CreateView, DdlStatement,
DmlStatement, Expr,
- ExprSchemable, GroupingSet, LogicalPlan, LogicalPlanBuilder, Operator,
TableScan,
- TryCast,
-};
+use crate::{Cast, Expr, ExprSchemable, GroupingSet, LogicalPlan, TryCast};
use arrow::datatypes::{DataType, TimeUnit};
-use datafusion_common::tree_node::{
- RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion,
-};
+use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{
- internal_err, plan_err, Column, Constraints, DFField, DFSchema,
DFSchemaRef,
- DataFusionError, Result, ScalarValue, TableReference,
+ internal_err, plan_err, Column, DFField, DFSchema, DFSchemaRef,
DataFusionError,
+ Result, ScalarValue, TableReference,
};
use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem,
WildcardAdditionalOptions};
use std::cmp::Ordering;
use std::collections::HashSet;
-use std::sync::Arc;
/// The value to which `COUNT(*)` is expanded to in
/// `COUNT(<constant>)` expressions
@@ -727,326 +714,13 @@ where
///
/// Notice: sometimes [from_plan] will use schema of original plan, it don't
change schema!
/// Such as `Projection/Aggregate/Window`
+#[deprecated(since = "31.0.0", note = "use LogicalPlan::with_new_exprs
instead")]
pub fn from_plan(
plan: &LogicalPlan,
expr: &[Expr],
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Projection(Projection { schema, .. }) => {
- Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
- expr.to_vec(),
- Arc::new(inputs[0].clone()),
- schema.clone(),
- )?))
- }
- LogicalPlan::Dml(DmlStatement {
- table_name,
- table_schema,
- op,
- ..
- }) => Ok(LogicalPlan::Dml(DmlStatement {
- table_name: table_name.clone(),
- table_schema: table_schema.clone(),
- op: op.clone(),
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::Copy(CopyTo {
- input: _,
- output_url,
- file_format,
- single_file_output,
- copy_options,
- }) => Ok(LogicalPlan::Copy(CopyTo {
- input: Arc::new(inputs[0].clone()),
- output_url: output_url.clone(),
- file_format: file_format.clone(),
- single_file_output: *single_file_output,
- copy_options: copy_options.clone(),
- })),
- LogicalPlan::Values(Values { schema, .. }) =>
Ok(LogicalPlan::Values(Values {
- schema: schema.clone(),
- values: expr
- .chunks_exact(schema.fields().len())
- .map(|s| s.to_vec())
- .collect::<Vec<_>>(),
- })),
- LogicalPlan::Filter { .. } => {
- assert_eq!(1, expr.len());
- let predicate = expr[0].clone();
-
- // filter predicates should not contain aliased expressions so we
remove any aliases
- // before this logic was added we would have aliases within
filters such as for
- // benchmark q6:
- //
- // lineitem.l_shipdate >= Date32(\"8766\")
- // AND lineitem.l_shipdate < Date32(\"9131\")
- // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS
lineitem.l_discount >=
- // Decimal128(Some(49999999999999),30,15)
- // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS
lineitem.l_discount <=
- // Decimal128(Some(69999999999999),30,15)
- // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
-
- struct RemoveAliases {}
-
- impl TreeNodeRewriter for RemoveAliases {
- type N = Expr;
-
- fn pre_visit(&mut self, expr: &Expr) ->
Result<RewriteRecursion> {
- match expr {
- Expr::Exists { .. }
- | Expr::ScalarSubquery(_)
- | Expr::InSubquery(_) => {
- // subqueries could contain aliases so we don't
recurse into those
- Ok(RewriteRecursion::Stop)
- }
- Expr::Alias(_) => Ok(RewriteRecursion::Mutate),
- _ => Ok(RewriteRecursion::Continue),
- }
- }
-
- fn mutate(&mut self, expr: Expr) -> Result<Expr> {
- Ok(expr.unalias())
- }
- }
-
- let mut remove_aliases = RemoveAliases {};
- let predicate = predicate.rewrite(&mut remove_aliases)?;
-
- Ok(LogicalPlan::Filter(Filter::try_new(
- predicate,
- Arc::new(inputs[0].clone()),
- )?))
- }
- LogicalPlan::Repartition(Repartition {
- partitioning_scheme,
- ..
- }) => match partitioning_scheme {
- Partitioning::RoundRobinBatch(n) => {
- Ok(LogicalPlan::Repartition(Repartition {
- partitioning_scheme: Partitioning::RoundRobinBatch(*n),
- input: Arc::new(inputs[0].clone()),
- }))
- }
- Partitioning::Hash(_, n) =>
Ok(LogicalPlan::Repartition(Repartition {
- partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
- input: Arc::new(inputs[0].clone()),
- })),
- Partitioning::DistributeBy(_) =>
Ok(LogicalPlan::Repartition(Repartition {
- partitioning_scheme:
Partitioning::DistributeBy(expr.to_owned()),
- input: Arc::new(inputs[0].clone()),
- })),
- },
- LogicalPlan::Window(Window {
- window_expr,
- schema,
- ..
- }) => Ok(LogicalPlan::Window(Window {
- input: Arc::new(inputs[0].clone()),
- window_expr: expr[0..window_expr.len()].to_vec(),
- schema: schema.clone(),
- })),
- LogicalPlan::Aggregate(Aggregate {
- group_expr, schema, ..
- }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
- Arc::new(inputs[0].clone()),
- expr[0..group_expr.len()].to_vec(),
- expr[group_expr.len()..].to_vec(),
- schema.clone(),
- )?)),
- LogicalPlan::Sort(SortPlan { fetch, .. }) =>
Ok(LogicalPlan::Sort(SortPlan {
- expr: expr.to_vec(),
- input: Arc::new(inputs[0].clone()),
- fetch: *fetch,
- })),
- LogicalPlan::Join(Join {
- join_type,
- join_constraint,
- on,
- null_equals_null,
- ..
- }) => {
- let schema =
- build_join_schema(inputs[0].schema(), inputs[1].schema(),
join_type)?;
-
- let equi_expr_count = on.len();
- assert!(expr.len() >= equi_expr_count);
-
- // The preceding part of expr is equi-exprs,
- // and the struct of each equi-expr is like `left-expr =
right-expr`.
- let new_on:Vec<(Expr,Expr)> =
expr.iter().take(equi_expr_count).map(|equi_expr| {
- // SimplifyExpression rule may add alias to the equi_expr.
- let unalias_expr = equi_expr.clone().unalias();
- if let Expr::BinaryExpr(BinaryExpr { left,
op:Operator::Eq, right }) = unalias_expr {
- Ok((*left, *right))
- } else {
- internal_err!(
- "The front part expressions should be an binary
equiality expression, actual:{equi_expr}"
- )
- }
- }).collect::<Result<Vec<(Expr, Expr)>>>()?;
-
- // Assume that the last expr, if any,
- // is the filter_expr (non equality predicate from ON clause)
- let filter_expr =
- (expr.len() > equi_expr_count).then(|| expr[expr.len() -
1].clone());
-
- Ok(LogicalPlan::Join(Join {
- left: Arc::new(inputs[0].clone()),
- right: Arc::new(inputs[1].clone()),
- join_type: *join_type,
- join_constraint: *join_constraint,
- on: new_on,
- filter: filter_expr,
- schema: DFSchemaRef::new(schema),
- null_equals_null: *null_equals_null,
- }))
- }
- LogicalPlan::CrossJoin(_) => {
- let left = inputs[0].clone();
- let right = inputs[1].clone();
- LogicalPlanBuilder::from(left).cross_join(right)?.build()
- }
- LogicalPlan::Subquery(Subquery {
- outer_ref_columns, ..
- }) => {
- let subquery =
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
- Ok(LogicalPlan::Subquery(Subquery {
- subquery: Arc::new(subquery),
- outer_ref_columns: outer_ref_columns.clone(),
- }))
- }
- LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
- Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
- inputs[0].clone(),
- alias.clone(),
- )?))
- }
- LogicalPlan::Limit(Limit { skip, fetch, .. }) =>
Ok(LogicalPlan::Limit(Limit {
- skip: *skip,
- fetch: *fetch,
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
- name,
- if_not_exists,
- or_replace,
- ..
- })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
- CreateMemoryTable {
- input: Arc::new(inputs[0].clone()),
- constraints: Constraints::empty(),
- name: name.clone(),
- if_not_exists: *if_not_exists,
- or_replace: *or_replace,
- },
- ))),
- LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
- name,
- or_replace,
- definition,
- ..
- })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
- input: Arc::new(inputs[0].clone()),
- name: name.clone(),
- or_replace: *or_replace,
- definition: definition.clone(),
- }))),
- LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
- node: e.node.from_template(expr, inputs),
- })),
- LogicalPlan::Union(Union { schema, .. }) =>
Ok(LogicalPlan::Union(Union {
- inputs: inputs.iter().cloned().map(Arc::new).collect(),
- schema: schema.clone(),
- })),
- LogicalPlan::Distinct(Distinct { .. }) =>
Ok(LogicalPlan::Distinct(Distinct {
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::Analyze(a) => {
- assert!(expr.is_empty());
- assert_eq!(inputs.len(), 1);
- Ok(LogicalPlan::Analyze(Analyze {
- verbose: a.verbose,
- schema: a.schema.clone(),
- input: Arc::new(inputs[0].clone()),
- }))
- }
- LogicalPlan::Explain(_) => {
- // Explain should be handled specially in the optimizers;
- // If this check cannot pass it means some optimizer pass is
- // trying to optimize Explain directly
- if expr.is_empty() {
- return plan_err!("Invalid EXPLAIN command. Expression is
empty");
- }
-
- if inputs.is_empty() {
- return plan_err!("Invalid EXPLAIN command. Inputs are empty");
- }
-
- Ok(plan.clone())
- }
- LogicalPlan::Prepare(Prepare {
- name, data_types, ..
- }) => Ok(LogicalPlan::Prepare(Prepare {
- name: name.clone(),
- data_types: data_types.clone(),
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::TableScan(ts) => {
- assert!(inputs.is_empty(), "{plan:?} should have no inputs");
- Ok(LogicalPlan::TableScan(TableScan {
- filters: expr.to_vec(),
- ..ts.clone()
- }))
- }
- LogicalPlan::EmptyRelation(_)
- | LogicalPlan::Ddl(_)
- | LogicalPlan::Statement(_) => {
- // All of these plan types have no inputs / exprs so should not be
called
- assert!(expr.is_empty(), "{plan:?} should have no exprs");
- assert!(inputs.is_empty(), "{plan:?} should have no inputs");
- Ok(plan.clone())
- }
- LogicalPlan::DescribeTable(_) => Ok(plan.clone()),
- LogicalPlan::Unnest(Unnest {
- column,
- schema,
- options,
- ..
- }) => {
- // Update schema with unnested column type.
- let input = Arc::new(inputs[0].clone());
- let nested_field = input.schema().field_from_column(column)?;
- let unnested_field = schema.field_from_column(column)?;
- let fields = input
- .schema()
- .fields()
- .iter()
- .map(|f| {
- if f == nested_field {
- unnested_field.clone()
- } else {
- f.clone()
- }
- })
- .collect::<Vec<_>>();
-
- let schema = Arc::new(
- DFSchema::new_with_metadata(fields,
input.schema().metadata().clone())?
- // We can use the existing functional dependencies as is:
- .with_functional_dependencies(
- input.schema().functional_dependencies().clone(),
- ),
- );
-
- Ok(LogicalPlan::Unnest(Unnest {
- input,
- column: column.clone(),
- schema,
- options: options.clone(),
- }))
- }
- }
+ plan.with_new_exprs(expr.to_vec(), inputs)
}
/// Find all columns referenced from an aggregate query
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index ba4b5d7b17..c61a8d3350 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -42,7 +42,6 @@ use datafusion_expr::type_coercion::other::{
get_coerce_type_for_case_expression, get_coerce_type_for_list,
};
use datafusion_expr::type_coercion::{is_datetime, is_numeric,
is_utf8_or_large_utf8};
-use datafusion_expr::utils::from_plan;
use datafusion_expr::{
is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
type_coercion, window_function, AggregateFunction, BuiltinScalarFunction,
Expr,
@@ -112,13 +111,13 @@ fn analyze_internal(
})
.collect::<Result<Vec<_>>>()?;
- // TODO: from_plan can't change the schema, so we need to do this here
+ // TODO: with_new_exprs can't change the schema, so we need to do this here
match &plan {
LogicalPlan::Projection(_) =>
Ok(LogicalPlan::Projection(Projection::try_new(
new_expr,
Arc::new(new_inputs[0].clone()),
)?)),
- _ => from_plan(plan, &new_expr, &new_inputs),
+ _ => plan.with_new_exprs(new_expr, &new_inputs),
}
}
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index d5c7641a13..571e2146c4 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -24,9 +24,7 @@ use datafusion_expr::{
and,
expr_rewriter::replace_col,
logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union},
- or,
- utils::from_plan,
- BinaryExpr, Expr, Filter, Operator, TableProviderFilterPushDown,
+ or, BinaryExpr, Expr, Filter, Operator, TableProviderFilterPushDown,
};
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
@@ -457,7 +455,7 @@ fn push_down_all_join(
if !join_conditions.is_empty() {
new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap());
}
- let plan = from_plan(join_plan, &new_exprs, &[left, right])?;
+ let plan = join_plan.with_new_exprs(new_exprs, &[left, right])?;
if keep_predicates.is_empty() {
Ok(plan)
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index c65768bb8b..e6d66720ee 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -23,7 +23,7 @@ use super::{ExprSimplifier, SimplifyContext};
use crate::utils::merge_schema;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{DFSchema, DFSchemaRef, Result};
-use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan};
+use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_physical_expr::execution_props::ExecutionProps;
/// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting
@@ -93,7 +93,7 @@ impl SimplifyExpressions {
})
.collect::<Result<Vec<_>>>()?;
- from_plan(plan, &expr, &new_inputs)
+ plan.with_new_exprs(expr, &new_inputs)
}
}
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 0be856ca9b..963a3dc06f 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -31,7 +31,6 @@ use datafusion_common::{
};
use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast};
use datafusion_expr::expr_rewriter::rewrite_preserving_name;
-use datafusion_expr::utils::from_plan;
use datafusion_expr::{
binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator,
};
@@ -105,11 +104,7 @@ impl OptimizerRule for UnwrapCastInComparison {
.collect::<Result<Vec<_>>>()?;
let inputs: Vec<LogicalPlan> =
plan.inputs().into_iter().cloned().collect();
- Ok(Some(from_plan(
- plan,
- new_exprs.as_slice(),
- inputs.as_slice(),
- )?))
+ Ok(Some(plan.with_new_exprs(new_exprs, inputs.as_slice())?))
}
fn name(&self) -> &str {