This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 44d154e62 Move `LogicalPlanBuilder` to `datafusion-expr` crate (#2576)
44d154e62 is described below

commit 44d154e62b8f1574639b61ad9b880034d3393708
Author: Andy Grove <[email protected]>
AuthorDate: Sun May 22 09:34:16 2022 -0600

    Move `LogicalPlanBuilder` to `datafusion-expr` crate (#2576)
---
 datafusion/core/src/logical_plan/expr.rs           |  99 +-----
 datafusion/core/src/logical_plan/mod.rs            |  26 +-
 .../core/src/optimizer/common_subexpr_eliminate.rs |   4 +-
 datafusion/core/src/optimizer/eliminate_filter.rs  |   4 +-
 datafusion/core/src/optimizer/eliminate_limit.rs   |   4 +-
 datafusion/core/src/optimizer/filter_push_down.rs  |   8 +-
 datafusion/core/src/optimizer/limit_push_down.rs   |   4 +-
 .../core/src/optimizer/projection_push_down.rs     |  14 +-
 .../core/src/optimizer/simplify_expressions.rs     |   4 +-
 .../src/optimizer/single_distinct_to_groupby.rs    |   6 +-
 datafusion/core/src/optimizer/utils.rs             | 204 +-----------
 datafusion/core/src/sql/planner.rs                 |  10 +-
 datafusion/core/src/sql/utils.rs                   |  57 +---
 datafusion/expr/src/lib.rs                         |   2 +-
 .../{core => expr}/src/logical_plan/builder.rs     |  89 ++++--
 datafusion/expr/src/logical_plan/mod.rs            |   2 +
 datafusion/expr/src/utils.rs                       | 347 ++++++++++++++++++++-
 17 files changed, 454 insertions(+), 430 deletions(-)

diff --git a/datafusion/core/src/logical_plan/expr.rs 
b/datafusion/core/src/logical_plan/expr.rs
index 6d90c78f1..f8d7f46c6 100644
--- a/datafusion/core/src/logical_plan/expr.rs
+++ b/datafusion/core/src/logical_plan/expr.rs
@@ -20,18 +20,14 @@
 
 pub use super::Operator;
 use crate::error::Result;
-use crate::logical_plan::ExprSchemable;
-use crate::logical_plan::{DFField, DFSchema};
-use crate::sql::utils::find_columns_referenced_by_expr;
 use arrow::datatypes::DataType;
 pub use datafusion_common::{Column, ExprSchema};
 pub use datafusion_expr::expr_fn::*;
-use datafusion_expr::logical_plan::Aggregate;
+use datafusion_expr::AccumulatorFunctionImplementation;
 use datafusion_expr::BuiltinScalarFunction;
 pub use datafusion_expr::Expr;
 use datafusion_expr::StateTypeFunction;
 pub use datafusion_expr::{lit, lit_timestamp_nano, Literal};
-use datafusion_expr::{AccumulatorFunctionImplementation, LogicalPlan};
 use datafusion_expr::{AggregateUDF, ScalarUDF};
 use datafusion_expr::{
     ReturnTypeFunction, ScalarFunctionImplementation, Signature, Volatility,
@@ -52,39 +48,6 @@ pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
     Some(combined_filter)
 }
 
-/// Convert an expression into Column expression if it's already provided as 
input plan.
-///
-/// For example, it rewrites:
-///
-/// ```text
-/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
-/// .project(vec![col("c1"), sum(col("c2"))?
-/// ```
-///
-/// Into:
-///
-/// ```text
-/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
-/// .project(vec![col("c1"), col("SUM(#c2)")?
-/// ```
-pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
-    match e {
-        Expr::Column(_) => e,
-        Expr::Alias(inner_expr, name) => {
-            Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), 
name)
-        }
-        Expr::ScalarSubquery(_) => e.clone(),
-        _ => match e.name(input_schema) {
-            Ok(name) => match input_schema.field_with_unqualified_name(&name) {
-                Ok(field) => Expr::Column(field.qualified_column()),
-                // expression not provided as input, do not convert to a 
column reference
-                Err(_) => e,
-            },
-            Err(_) => e,
-        },
-    }
-}
-
 /// Recursively un-alias an expressions
 #[inline]
 pub fn unalias(expr: Expr) -> Expr {
@@ -137,66 +100,6 @@ pub fn create_udaf(
     )
 }
 
-/// Find all columns referenced from an aggregate query
-fn agg_cols(agg: &Aggregate) -> Result<Vec<Column>> {
-    Ok(agg
-        .aggr_expr
-        .iter()
-        .chain(&agg.group_expr)
-        .flat_map(find_columns_referenced_by_expr)
-        .collect())
-}
-
-fn exprlist_to_fields_aggregate(
-    exprs: &[Expr],
-    plan: &LogicalPlan,
-    agg: &Aggregate,
-) -> Result<Vec<DFField>> {
-    let agg_cols = agg_cols(agg)?;
-    let mut fields = vec![];
-    for expr in exprs {
-        match expr {
-            Expr::Column(c) if agg_cols.iter().any(|x| x == c) => {
-                // resolve against schema of input to aggregate
-                fields.push(expr.to_field(agg.input.schema())?);
-            }
-            _ => fields.push(expr.to_field(plan.schema())?),
-        }
-    }
-    Ok(fields)
-}
-
-/// Create field meta-data from an expression, for use in a result set schema
-pub fn exprlist_to_fields<'a>(
-    expr: impl IntoIterator<Item = &'a Expr>,
-    plan: &LogicalPlan,
-) -> Result<Vec<DFField>> {
-    let exprs: Vec<Expr> = expr.into_iter().cloned().collect();
-    // when dealing with aggregate plans we cannot simply look in the 
aggregate output schema
-    // because it will contain columns representing complex expressions (such 
a column named
-    // `#GROUPING(person.state)` so in order to resolve `person.state` in this 
case we need to
-    // look at the input to the aggregate instead.
-    let fields = match plan {
-        LogicalPlan::Aggregate(agg) => {
-            Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
-        }
-        LogicalPlan::Window(window) => match window.input.as_ref() {
-            LogicalPlan::Aggregate(agg) => {
-                Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
-            }
-            _ => None,
-        },
-        _ => None,
-    };
-    if let Some(fields) = fields {
-        fields
-    } else {
-        // look for exact match in plan's output schema
-        let input_schema = &plan.schema();
-        exprs.iter().map(|e| e.to_field(input_schema)).collect()
-    }
-}
-
 /// Calls a named built in function
 /// ```
 /// use datafusion::logical_plan::*;
diff --git a/datafusion/core/src/logical_plan/mod.rs 
b/datafusion/core/src/logical_plan/mod.rs
index e9496c8f2..0a9731441 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -21,35 +21,33 @@
 //! Logical query plans can then be optimized and executed directly, or 
translated into
 //! physical query plans and executed.
 
-pub(crate) mod builder;
 mod expr;
 mod expr_simplier;
 pub mod plan;
 mod registry;
 pub mod window_frames;
-pub use builder::{
-    build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
-};
 pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
 pub use datafusion_expr::{
     expr_fn::binary_expr,
     expr_rewriter,
     expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
+    logical_plan::builder::{
+        build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
+    },
     ExprSchemable, Operator,
 };
 pub use expr::{
     abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, 
asin, atan,
     avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, 
coalesce, col,
-    columnize_expr, combine_filters, concat, concat_expr, concat_ws, 
concat_ws_expr, cos,
-    count, count_distinct, create_udaf, create_udf, date_part, date_trunc, 
digest,
-    exists, exp, exprlist_to_fields, floor, in_list, in_subquery, initcap, 
left, length,
-    lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, 
min,
-    not_exists, not_in_subquery, now, now_expr, nullif, octet_length, or, 
power, random,
-    regexp_match, regexp_replace, repeat, replace, reverse, right, round, 
rpad, rtrim,
-    scalar_subquery, sha224, sha256, sha384, sha512, signum, sin, split_part, 
sqrt,
-    starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros,
-    to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, 
unalias, upper,
-    when, Column, Expr, ExprSchema, Literal,
+    combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, 
count,
+    count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, 
exists, exp,
+    floor, in_list, in_subquery, initcap, left, length, lit, 
lit_timestamp_nano, ln,
+    log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, 
not_in_subquery, now,
+    now_expr, nullif, octet_length, or, power, random, regexp_match, 
regexp_replace,
+    repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, 
sha224, sha256,
+    sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, 
substr, sum, tan,
+    to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, 
translate,
+    trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
 };
 pub use expr_rewriter::{
     normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs 
b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index af0eea663..81183f56d 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -27,9 +27,9 @@ use crate::logical_plan::{
     ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
 };
 use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
 use arrow::datatypes::DataType;
 use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::from_plan;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
@@ -238,7 +238,7 @@ fn optimize(plan: &LogicalPlan, execution_props: 
&ExecutionProps) -> Result<Logi
                 .map(|input_plan| optimize(input_plan, execution_props))
                 .collect::<Result<Vec<_>>>()?;
 
-            utils::from_plan(plan, &expr, &new_inputs)
+            from_plan(plan, &expr, &new_inputs)
         }
     }
 }
diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs 
b/datafusion/core/src/optimizer/eliminate_filter.rs
index 800963ef5..fb99a9798 100644
--- a/datafusion/core/src/optimizer/eliminate_filter.rs
+++ b/datafusion/core/src/optimizer/eliminate_filter.rs
@@ -19,6 +19,7 @@
 //! This saves time in planning and executing the query.
 //! Note that this rule should be applied after simplify expressions optimizer 
rule.
 use datafusion_common::ScalarValue;
+use datafusion_expr::utils::from_plan;
 use datafusion_expr::Expr;
 
 use crate::error::Result;
@@ -26,7 +27,6 @@ use crate::logical_plan::plan::Filter;
 use crate::logical_plan::{EmptyRelation, LogicalPlan};
 use crate::optimizer::optimizer::OptimizerRule;
 
-use super::utils;
 use crate::execution::context::ExecutionProps;
 
 /// Optimization rule that elimanate the scalar value (true/false) filter with 
an [LogicalPlan::EmptyRelation]
@@ -68,7 +68,7 @@ impl OptimizerRule for EliminateFilter {
                     .map(|plan| self.optimize(plan, execution_props))
                     .collect::<Result<Vec<_>>>()?;
 
-                utils::from_plan(plan, &plan.expressions(), &new_inputs)
+                from_plan(plan, &plan.expressions(), &new_inputs)
             }
         }
     }
diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs 
b/datafusion/core/src/optimizer/eliminate_limit.rs
index c1fc2068d..a7acf7ca6 100644
--- a/datafusion/core/src/optimizer/eliminate_limit.rs
+++ b/datafusion/core/src/optimizer/eliminate_limit.rs
@@ -20,8 +20,8 @@
 use crate::error::Result;
 use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
 use crate::optimizer::optimizer::OptimizerRule;
+use datafusion_expr::utils::from_plan;
 
-use super::utils;
 use crate::execution::context::ExecutionProps;
 
 /// Optimization rule that replaces LIMIT 0 with an 
[LogicalPlan::EmptyRelation]
@@ -59,7 +59,7 @@ impl OptimizerRule for EliminateLimit {
                     .map(|plan| self.optimize(plan, execution_props))
                     .collect::<Result<Vec<_>>>()?;
 
-                utils::from_plan(plan, &expr, &new_inputs)
+                from_plan(plan, &expr, &new_inputs)
             }
         }
     }
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs 
b/datafusion/core/src/optimizer/filter_push_down.rs
index 7a0383463..45b1fde36 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -24,7 +24,7 @@ use crate::logical_plan::{
 use crate::logical_plan::{DFSchema, Expr};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
-use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns};
+use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns, from_plan};
 use std::collections::{HashMap, HashSet};
 
 /// Filter Push Down optimizer rule pushes filter clauses down the plan
@@ -90,7 +90,7 @@ fn push_down(state: &State, plan: &LogicalPlan) -> 
Result<LogicalPlan> {
         .collect::<Result<Vec<_>>>()?;
 
     let expr = plan.expressions();
-    utils::from_plan(plan, &expr, &new_inputs)
+    from_plan(plan, &expr, &new_inputs)
 }
 
 // remove all filters from `filters` that are in `predicate_columns`
@@ -246,7 +246,7 @@ fn optimize_join(
 
     // create a new Join with the new `left` and `right`
     let expr = plan.expressions();
-    let plan = utils::from_plan(plan, &expr, &[left, right])?;
+    let plan = from_plan(plan, &expr, &[left, right])?;
 
     if to_keep.0.is_empty() {
         Ok(plan)
@@ -334,7 +334,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
             // optimize inner
             let new_input = optimize(input, state)?;
 
-            utils::from_plan(plan, expr, &[new_input])
+            from_plan(plan, expr, &[new_input])
         }
         LogicalPlan::Aggregate(Aggregate {
             aggr_expr, input, ..
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs 
b/datafusion/core/src/optimizer/limit_push_down.rs
index a52fd40df..087ac53c8 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -17,7 +17,6 @@
 
 //! Optimizer rule to push down LIMIT in the query plan
 //! It will push down through projection, limits (taking the smaller limit)
-use super::utils;
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::Projection;
@@ -25,6 +24,7 @@ use crate::logical_plan::{Limit, TableScan};
 use crate::logical_plan::{LogicalPlan, Union};
 use crate::optimizer::optimizer::OptimizerRule;
 use datafusion_expr::logical_plan::Offset;
+use datafusion_expr::utils::from_plan;
 use std::sync::Arc;
 
 /// Optimization rule that tries pushes down LIMIT n
@@ -171,7 +171,7 @@ fn limit_push_down(
                 })
                 .collect::<Result<Vec<_>>>()?;
 
-            utils::from_plan(plan, &expr, &new_inputs)
+            from_plan(plan, &expr, &new_inputs)
         }
     }
 }
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs 
b/datafusion/core/src/optimizer/projection_push_down.rs
index cf14adcd1..5a127df6d 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -28,10 +28,11 @@ use crate::logical_plan::{
     LogicalPlanBuilder, ToDFSchema, Union,
 };
 use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
 use arrow::datatypes::{Field, Schema};
 use arrow::error::Result as ArrowResult;
-use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns, 
find_sort_exprs};
+use datafusion_expr::utils::{
+    expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan,
+};
 use datafusion_expr::Expr;
 use std::{
     collections::{BTreeSet, HashSet},
@@ -458,7 +459,7 @@ fn optimize_plan(
                         _execution_props,
                     )?];
                     let expr = vec![];
-                    utils::from_plan(plan, &expr, &new_inputs)
+                    from_plan(plan, &expr, &new_inputs)
                 }
                 _ => Err(DataFusionError::Plan(
                     "SubqueryAlias should only wrap TableScan".to_string(),
@@ -502,7 +503,7 @@ fn optimize_plan(
                 })
                 .collect::<Result<Vec<_>>>()?;
 
-            utils::from_plan(plan, &expr, &new_inputs)
+            from_plan(plan, &expr, &new_inputs)
         }
     }
 }
@@ -513,12 +514,11 @@ mod tests {
     use std::collections::HashMap;
 
     use super::*;
-    use crate::logical_plan::{
-        col, exprlist_to_fields, lit, max, min, Expr, JoinType, 
LogicalPlanBuilder,
-    };
+    use crate::logical_plan::{col, lit, max, min, Expr, JoinType, 
LogicalPlanBuilder};
     use crate::test::*;
     use crate::test_util::scan_empty;
     use arrow::datatypes::DataType;
+    use datafusion_expr::utils::exprlist_to_fields;
 
     #[test]
     fn aggregate_no_group_by() -> Result<()> {
diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs 
b/datafusion/core/src/optimizer/simplify_expressions.rs
index 216af223a..8c628906a 100644
--- a/datafusion/core/src/optimizer/simplify_expressions.rs
+++ b/datafusion/core/src/optimizer/simplify_expressions.rs
@@ -25,13 +25,13 @@ use crate::logical_plan::{
     LogicalPlan, RewriteRecursion, SimplifyInfo,
 };
 use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
 use crate::physical_plan::planner::create_physical_expr;
 use crate::scalar::ScalarValue;
 use crate::{error::Result, logical_plan::Operator};
 use arrow::array::new_null_array;
 use arrow::datatypes::{DataType, Field, Schema};
 use arrow::record_batch::RecordBatch;
+use datafusion_expr::utils::from_plan;
 use datafusion_expr::Volatility;
 
 /// Provides simplification information based on schema and properties
@@ -234,7 +234,7 @@ impl OptimizerRule for SimplifyExpressions {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        utils::from_plan(plan, &expr, &new_inputs)
+        from_plan(plan, &expr, &new_inputs)
     }
 }
 
diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs 
b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
index dfbefa63a..65ff56556 100644
--- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
@@ -21,9 +21,9 @@ use crate::error::Result;
 use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{Aggregate, Projection};
 use crate::logical_plan::ExprSchemable;
-use crate::logical_plan::{col, columnize_expr, DFSchema, Expr, LogicalPlan};
+use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan};
 use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
+use datafusion_expr::utils::{columnize_expr, from_plan};
 use hashbrown::HashSet;
 use std::sync::Arc;
 
@@ -155,7 +155,7 @@ fn optimize_children(plan: &LogicalPlan) -> 
Result<LogicalPlan> {
         .iter()
         .map(|plan| optimize(plan))
         .collect::<Result<Vec<_>>>()?;
-    utils::from_plan(plan, &expr, &new_inputs)
+    from_plan(plan, &expr, &new_inputs)
 }
 
 fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
diff --git a/datafusion/core/src/optimizer/utils.rs 
b/datafusion/core/src/optimizer/utils.rs
index 96ba969c4..81d4b76a2 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -19,21 +19,14 @@
 
 use super::optimizer::OptimizerRule;
 use crate::execution::context::ExecutionProps;
-use datafusion_expr::logical_plan::{
-    Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Subquery,
-    SubqueryAlias, Window,
-};
+use datafusion_expr::logical_plan::Filter;
 
 use crate::error::{DataFusionError, Result};
-use crate::logical_plan::{
-    and, build_join_schema, CreateMemoryTable, CreateView, DFSchemaRef, Expr, 
Limit,
-    LogicalPlan, LogicalPlanBuilder, Offset, Operator, Partitioning, 
Repartition, Union,
-    Values,
-};
+use crate::logical_plan::{and, Expr, LogicalPlan, Operator};
 use crate::prelude::lit;
 use crate::scalar::ScalarValue;
-use datafusion_common::DFSchema;
 use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::from_plan;
 use std::sync::Arc;
 
 const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
@@ -61,197 +54,6 @@ pub fn optimize_children(
     from_plan(plan, &new_exprs, &new_inputs)
 }
 
-/// Returns a new logical plan based on the original one with inputs
-/// and expressions replaced.
-///
-/// The exprs correspond to the same order of expressions returned by
-/// `LogicalPlan::expressions`. This function is used in optimizers in
-/// the following way:
-///
-/// ```text
-/// let new_inputs = optimize_children(..., plan, props);
-///
-/// // get the plans expressions to optimize
-/// let exprs = plan.expressions();
-///
-/// // potentially rewrite plan expressions
-/// let rewritten_exprs = rewrite_exprs(exprs);
-///
-/// // create new plan using rewritten_exprs in same position
-/// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs);
-/// ```
-pub fn from_plan(
-    plan: &LogicalPlan,
-    expr: &[Expr],
-    inputs: &[LogicalPlan],
-) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Projection(Projection { schema, alias, .. }) => {
-            Ok(LogicalPlan::Projection(Projection {
-                expr: expr.to_vec(),
-                input: Arc::new(inputs[0].clone()),
-                schema: schema.clone(),
-                alias: alias.clone(),
-            }))
-        }
-        LogicalPlan::Values(Values { schema, .. }) => 
Ok(LogicalPlan::Values(Values {
-            schema: schema.clone(),
-            values: expr
-                .chunks_exact(schema.fields().len())
-                .map(|s| s.to_vec())
-                .collect::<Vec<_>>(),
-        })),
-        LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter {
-            predicate: expr[0].clone(),
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::Repartition(Repartition {
-            partitioning_scheme,
-            ..
-        }) => match partitioning_scheme {
-            Partitioning::RoundRobinBatch(n) => {
-                Ok(LogicalPlan::Repartition(Repartition {
-                    partitioning_scheme: Partitioning::RoundRobinBatch(*n),
-                    input: Arc::new(inputs[0].clone()),
-                }))
-            }
-            Partitioning::Hash(_, n) => 
Ok(LogicalPlan::Repartition(Repartition {
-                partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
-                input: Arc::new(inputs[0].clone()),
-            })),
-        },
-        LogicalPlan::Window(Window {
-            window_expr,
-            schema,
-            ..
-        }) => Ok(LogicalPlan::Window(Window {
-            input: Arc::new(inputs[0].clone()),
-            window_expr: expr[0..window_expr.len()].to_vec(),
-            schema: schema.clone(),
-        })),
-        LogicalPlan::Aggregate(Aggregate {
-            group_expr, schema, ..
-        }) => Ok(LogicalPlan::Aggregate(Aggregate {
-            group_expr: expr[0..group_expr.len()].to_vec(),
-            aggr_expr: expr[group_expr.len()..].to_vec(),
-            input: Arc::new(inputs[0].clone()),
-            schema: schema.clone(),
-        })),
-        LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort {
-            expr: expr.to_vec(),
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::Join(Join {
-            join_type,
-            join_constraint,
-            on,
-            null_equals_null,
-            ..
-        }) => {
-            let schema =
-                build_join_schema(inputs[0].schema(), inputs[1].schema(), 
join_type)?;
-            Ok(LogicalPlan::Join(Join {
-                left: Arc::new(inputs[0].clone()),
-                right: Arc::new(inputs[1].clone()),
-                join_type: *join_type,
-                join_constraint: *join_constraint,
-                on: on.clone(),
-                schema: DFSchemaRef::new(schema),
-                null_equals_null: *null_equals_null,
-            }))
-        }
-        LogicalPlan::CrossJoin(_) => {
-            let left = inputs[0].clone();
-            let right = &inputs[1];
-            LogicalPlanBuilder::from(left).cross_join(right)?.build()
-        }
-        LogicalPlan::Subquery(_) => {
-            let subquery = 
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
-            Ok(LogicalPlan::Subquery(Subquery {
-                subquery: Arc::new(subquery),
-            }))
-        }
-        LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
-            let schema = inputs[0].schema().as_ref().clone().into();
-            let schema =
-                DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, 
&schema)?);
-            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
-                alias: alias.clone(),
-                input: Arc::new(inputs[0].clone()),
-                schema,
-            }))
-        }
-        LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
-            n: *n,
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::Offset(Offset { offset, .. }) => 
Ok(LogicalPlan::Offset(Offset {
-            offset: *offset,
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::CreateMemoryTable(CreateMemoryTable {
-            name,
-            if_not_exists,
-            ..
-        }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
-            input: Arc::new(inputs[0].clone()),
-            name: name.clone(),
-            if_not_exists: *if_not_exists,
-        })),
-        LogicalPlan::CreateView(CreateView {
-            name, or_replace, ..
-        }) => Ok(LogicalPlan::CreateView(CreateView {
-            input: Arc::new(inputs[0].clone()),
-            name: name.clone(),
-            or_replace: *or_replace,
-        })),
-        LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
-            node: e.node.from_template(expr, inputs),
-        })),
-        LogicalPlan::Union(Union { schema, alias, .. }) => {
-            Ok(LogicalPlan::Union(Union {
-                inputs: inputs.to_vec(),
-                schema: schema.clone(),
-                alias: alias.clone(),
-            }))
-        }
-        LogicalPlan::Analyze(a) => {
-            assert!(expr.is_empty());
-            assert_eq!(inputs.len(), 1);
-            Ok(LogicalPlan::Analyze(Analyze {
-                verbose: a.verbose,
-                schema: a.schema.clone(),
-                input: Arc::new(inputs[0].clone()),
-            }))
-        }
-        LogicalPlan::Explain(_) => {
-            // Explain should be handled specially in the optimizers;
-            // If this assert fails it means some optimizer pass is
-            // trying to optimize Explain directly
-            assert!(
-                expr.is_empty(),
-                "Explain can not be created from utils::from_expr"
-            );
-            assert!(
-                inputs.is_empty(),
-                "Explain can not be created from utils::from_expr"
-            );
-            Ok(plan.clone())
-        }
-        LogicalPlan::EmptyRelation(_)
-        | LogicalPlan::TableScan { .. }
-        | LogicalPlan::CreateExternalTable(_)
-        | LogicalPlan::DropTable(_)
-        | LogicalPlan::CreateCatalogSchema(_)
-        | LogicalPlan::CreateCatalog(_) => {
-            // All of these plan types have no inputs / exprs so should not be 
called
-            assert!(expr.is_empty(), "{:?} should have no exprs", plan);
-            assert!(inputs.is_empty(), "{:?}  should have no inputs", plan);
-            Ok(plan.clone())
-        }
-    }
-}
-
 /// Returns all direct children `Expression`s of `expr`.
 /// E.g. if the expression is "(a + 1) + 1", it returns ["a + 1", "1"] (as 
Expr objects)
 pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
diff --git a/datafusion/core/src/sql/planner.rs 
b/datafusion/core/src/sql/planner.rs
index 3d5e8da93..b2dad55f2 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -46,13 +46,15 @@ use crate::{
 };
 use arrow::datatypes::*;
 use datafusion_expr::utils::{
-    exprlist_to_columns, find_aggregate_exprs, find_window_exprs,
+    expr_as_column_expr, exprlist_to_columns, find_aggregate_exprs, 
find_column_exprs,
+    find_window_exprs,
 };
 use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
 use hashbrown::HashMap;
 
 use datafusion_common::field_not_found;
 use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::logical_plan::builder::project_with_alias;
 use datafusion_expr::logical_plan::{Filter, Subquery};
 use sqlparser::ast::{
     BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, 
FunctionArg,
@@ -68,12 +70,10 @@ use sqlparser::parser::ParserError::ParserError;
 use super::{
     parser::DFParser,
     utils::{
-        check_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
-        find_column_exprs, rebase_expr, resolve_aliases_to_exprs,
-        resolve_positions_to_exprs,
+        check_columns_satisfy_exprs, extract_aliases, rebase_expr,
+        resolve_aliases_to_exprs, resolve_positions_to_exprs,
     },
 };
-use crate::logical_plan::builder::project_with_alias;
 use crate::logical_plan::plan::{Analyze, Explain};
 
 /// The ContextProvider trait allows the query planner to obtain meta-data 
about tables and
diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs
index 0b8e8d3a6..7a2523e03 100644
--- a/datafusion/core/src/sql/utils.rs
+++ b/datafusion/core/src/sql/utils.rs
@@ -20,66 +20,13 @@
 use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION};
 use sqlparser::ast::Ident;
 
-use crate::logical_plan::ExprVisitable;
+use crate::error::{DataFusionError, Result};
 use crate::logical_plan::{Expr, LogicalPlan};
 use crate::scalar::ScalarValue;
-use crate::{
-    error::{DataFusionError, Result},
-    logical_plan::{Column, ExpressionVisitor, Recursion},
-};
 use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
 use std::collections::HashMap;
 
-/// Collect all deeply nested `Expr::Column`'s. They are returned in order of
-/// appearance (depth first), and may contain duplicates.
-pub(crate) fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
-    exprs
-        .iter()
-        .flat_map(find_columns_referenced_by_expr)
-        .map(Expr::Column)
-        .collect()
-}
-
-/// Recursively find all columns referenced by an expression
-#[derive(Debug, Default)]
-struct ColumnCollector {
-    exprs: Vec<Column>,
-}
-
-impl ExpressionVisitor for ColumnCollector {
-    fn pre_visit(mut self, expr: &Expr) -> Result<Recursion<Self>> {
-        if let Expr::Column(c) = expr {
-            self.exprs.push(c.clone())
-        }
-        Ok(Recursion::Continue(self))
-    }
-}
-
-pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
-    // As the `ExpressionVisitor` impl above always returns Ok, this
-    // "can't" error
-    let ColumnCollector { exprs } = e
-        .accept(ColumnCollector::default())
-        .expect("Unexpected error");
-    exprs
-}
-
-/// Convert any `Expr` to an `Expr::Column`.
-pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> 
Result<Expr> {
-    match expr {
-        Expr::Column(col) => {
-            let field = plan.schema().field_from_column(col)?;
-            Ok(Expr::Column(field.qualified_column()))
-        }
-        _ => {
-            // we should not be trying to create a name for the expression
-            // based on the input schema but this is the current behavior
-            // see https://github.com/apache/arrow-datafusion/issues/2456
-            Ok(Expr::Column(Column::from_name(expr.name(plan.schema())?)))
-        }
-    }
-}
-
 /// Make a best-effort attempt at resolving all columns in the expression tree
 pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> 
{
     clone_with_replacement(expr, &|nested_expr| {
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 87bb77067..f71243610 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -65,7 +65,7 @@ pub use function::{
     StateTypeFunction,
 };
 pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral};
-pub use logical_plan::{LogicalPlan, PlanVisitor};
+pub use logical_plan::{LogicalPlan, LogicalPlanBuilder, PlanVisitor};
 pub use nullif::SUPPORTED_NULLIF_TYPES;
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
diff --git a/datafusion/core/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
similarity index 95%
rename from datafusion/core/src/logical_plan/builder.rs
rename to datafusion/expr/src/logical_plan/builder.rs
index cad1b0a6a..9778e2fe9 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -17,18 +17,25 @@
 
 //! This module provides a builder for creating LogicalPlans
 
-use crate::error::{DataFusionError, Result};
-use crate::logical_expr::ExprSchemable;
-use crate::logical_plan::plan::{
-    Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
-    SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
+use crate::expr_rewriter::{normalize_col, normalize_cols, 
rewrite_sort_cols_by_aggs};
+use crate::utils::{columnize_expr, exprlist_to_fields, from_plan};
+use crate::{
+    logical_plan::{
+        Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join,
+        JoinConstraint, JoinType, Limit, LogicalPlan, Offset, Partitioning, 
PlanType,
+        Projection, Repartition, Sort, SubqueryAlias, TableScan, 
ToStringifiedPlan,
+        Union, Values, Window,
+    },
+    utils::{
+        expand_qualified_wildcard, expand_wildcard, expr_to_columns,
+        group_window_expr_by_sort_keys,
+    },
+    Expr, ExprSchemable, TableSource,
 };
-use crate::optimizer::utils;
-use crate::scalar::ScalarValue;
 use arrow::datatypes::{DataType, Schema};
-use datafusion_expr::utils::{
-    expand_qualified_wildcard, expand_wildcard, expr_to_columns,
-    group_window_expr_by_sort_keys,
+use datafusion_common::{
+    Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, 
ScalarValue,
+    ToDFSchema,
 };
 use std::convert::TryFrom;
 use std::iter;
@@ -37,16 +44,6 @@ use std::{
     sync::Arc,
 };
 
-use super::{Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
-use crate::logical_plan::{
-    columnize_expr, exprlist_to_fields, normalize_col, normalize_cols,
-    rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, 
DFSchemaRef, Limit,
-    Offset, Partitioning, Repartition, Values,
-};
-
-use datafusion_common::ToDFSchema;
-use datafusion_expr::TableSource;
-
 /// Default table name for unnamed table
 pub const UNNAMED_TABLE: &str = "?table?";
 
@@ -240,8 +237,9 @@ impl LogicalPlanBuilder {
         });
         Ok(Self::from(table_scan))
     }
+
     /// Wrap a plan in a window
-    pub(crate) fn window_plan(
+    pub fn window_plan(
         input: LogicalPlan,
         window_exprs: Vec<Expr>,
     ) -> Result<LogicalPlan> {
@@ -368,7 +366,7 @@ impl LogicalPlanBuilder {
                     .collect::<Result<Vec<_>>>()?;
 
                 let expr = curr_plan.expressions();
-                utils::from_plan(&curr_plan, &expr, &new_inputs)
+                from_plan(&curr_plan, &expr, &new_inputs)
             }
         }
     }
@@ -704,7 +702,7 @@ impl LogicalPlanBuilder {
     }
 
     /// Process intersect set operator
-    pub(crate) fn intersect(
+    pub fn intersect(
         left_plan: LogicalPlan,
         right_plan: LogicalPlan,
         is_all: bool,
@@ -718,7 +716,7 @@ impl LogicalPlanBuilder {
     }
 
     /// Process except set operator
-    pub(crate) fn except(
+    pub fn except(
         left_plan: LogicalPlan,
         right_plan: LogicalPlan,
         is_all: bool,
@@ -938,17 +936,15 @@ pub fn project_with_alias(
 
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::{DataType, Field};
+    use crate::expr_fn::exists;
+    use arrow::datatypes::{DataType, Field, SchemaRef};
     use datafusion_common::SchemaError;
-    use datafusion_expr::expr_fn::exists;
+    use std::any::Any;
 
     use crate::logical_plan::StringifiedPlan;
-    use crate::prelude::*;
-    use crate::test::test_table_scan_with_name;
-    use crate::test_util::scan_empty;
 
-    use super::super::{col, lit, sum};
     use super::*;
+    use crate::{col, in_subquery, lit, scalar_subquery, sum};
 
     #[test]
     fn plan_builder_simple() -> Result<()> {
@@ -1239,4 +1235,37 @@ mod tests {
         assert!(stringified_plan.should_display(true));
         assert!(!stringified_plan.should_display(false));
     }
+
+    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::UInt32, false),
+            Field::new("b", DataType::UInt32, false),
+            Field::new("c", DataType::UInt32, false),
+        ]);
+        scan_empty(Some(name), &schema, None)?.build()
+    }
+
+    fn scan_empty(
+        name: Option<&str>,
+        table_schema: &Schema,
+        projection: Option<Vec<usize>>,
+    ) -> Result<LogicalPlanBuilder> {
+        let table_schema = Arc::new(table_schema.clone());
+        let table_source = Arc::new(EmptyTable { table_schema });
+        LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, 
projection)
+    }
+
+    struct EmptyTable {
+        table_schema: SchemaRef,
+    }
+
+    impl TableSource for EmptyTable {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
+        fn schema(&self) -> SchemaRef {
+            self.table_schema.clone()
+        }
+    }
 }
diff --git a/datafusion/expr/src/logical_plan/mod.rs 
b/datafusion/expr/src/logical_plan/mod.rs
index e9a90cd7f..f96e4320f 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod builder;
 pub mod display;
 mod extension;
 mod plan;
 
+pub use builder::LogicalPlanBuilder;
 pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
     CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, 
Explain,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 709a3eee2..a7aed1799 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -18,9 +18,18 @@
 //! Expression utilities
 
 use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
-use crate::{Expr, LogicalPlan};
-use datafusion_common::{Column, DFField, DFSchema, DataFusionError, Result};
+use crate::logical_plan::builder::build_join_schema;
+use crate::logical_plan::{
+    Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter, 
Join, Limit,
+    Offset, Partitioning, Projection, Repartition, Sort, Subquery, 
SubqueryAlias, Union,
+    Values, Window,
+};
+use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
+use datafusion_common::{
+    Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result,
+};
 use std::collections::HashSet;
+use std::sync::Arc;
 
 /// Recursively walk a list of expression trees, collecting the unique set of 
columns
 /// referenced in the expression
@@ -285,6 +294,340 @@ where
     }
 }
 
+/// Returns a new logical plan based on the original one with inputs
+/// and expressions replaced.
+///
+/// The exprs correspond to the same order of expressions returned by
+/// `LogicalPlan::expressions`. This function is used in optimizers in
+/// the following way:
+///
+/// ```text
+/// let new_inputs = optimize_children(..., plan, props);
+///
+/// // get the plans expressions to optimize
+/// let exprs = plan.expressions();
+///
+/// // potentially rewrite plan expressions
+/// let rewritten_exprs = rewrite_exprs(exprs);
+///
+/// // create new plan using rewritten_exprs in same position
+/// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs);
+/// ```
+pub fn from_plan(
+    plan: &LogicalPlan,
+    expr: &[Expr],
+    inputs: &[LogicalPlan],
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection(Projection { schema, alias, .. }) => {
+            Ok(LogicalPlan::Projection(Projection {
+                expr: expr.to_vec(),
+                input: Arc::new(inputs[0].clone()),
+                schema: schema.clone(),
+                alias: alias.clone(),
+            }))
+        }
+        LogicalPlan::Values(Values { schema, .. }) => 
Ok(LogicalPlan::Values(Values {
+            schema: schema.clone(),
+            values: expr
+                .chunks_exact(schema.fields().len())
+                .map(|s| s.to_vec())
+                .collect::<Vec<_>>(),
+        })),
+        LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter {
+            predicate: expr[0].clone(),
+            input: Arc::new(inputs[0].clone()),
+        })),
+        LogicalPlan::Repartition(Repartition {
+            partitioning_scheme,
+            ..
+        }) => match partitioning_scheme {
+            Partitioning::RoundRobinBatch(n) => {
+                Ok(LogicalPlan::Repartition(Repartition {
+                    partitioning_scheme: Partitioning::RoundRobinBatch(*n),
+                    input: Arc::new(inputs[0].clone()),
+                }))
+            }
+            Partitioning::Hash(_, n) => 
Ok(LogicalPlan::Repartition(Repartition {
+                partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
+                input: Arc::new(inputs[0].clone()),
+            })),
+        },
+        LogicalPlan::Window(Window {
+            window_expr,
+            schema,
+            ..
+        }) => Ok(LogicalPlan::Window(Window {
+            input: Arc::new(inputs[0].clone()),
+            window_expr: expr[0..window_expr.len()].to_vec(),
+            schema: schema.clone(),
+        })),
+        LogicalPlan::Aggregate(Aggregate {
+            group_expr, schema, ..
+        }) => Ok(LogicalPlan::Aggregate(Aggregate {
+            group_expr: expr[0..group_expr.len()].to_vec(),
+            aggr_expr: expr[group_expr.len()..].to_vec(),
+            input: Arc::new(inputs[0].clone()),
+            schema: schema.clone(),
+        })),
+        LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort {
+            expr: expr.to_vec(),
+            input: Arc::new(inputs[0].clone()),
+        })),
+        LogicalPlan::Join(Join {
+            join_type,
+            join_constraint,
+            on,
+            null_equals_null,
+            ..
+        }) => {
+            let schema =
+                build_join_schema(inputs[0].schema(), inputs[1].schema(), 
join_type)?;
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(inputs[0].clone()),
+                right: Arc::new(inputs[1].clone()),
+                join_type: *join_type,
+                join_constraint: *join_constraint,
+                on: on.clone(),
+                schema: DFSchemaRef::new(schema),
+                null_equals_null: *null_equals_null,
+            }))
+        }
+        LogicalPlan::CrossJoin(_) => {
+            let left = inputs[0].clone();
+            let right = &inputs[1];
+            LogicalPlanBuilder::from(left).cross_join(right)?.build()
+        }
+        LogicalPlan::Subquery(_) => {
+            let subquery = 
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+            Ok(LogicalPlan::Subquery(Subquery {
+                subquery: Arc::new(subquery),
+            }))
+        }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
+            let schema = inputs[0].schema().as_ref().clone().into();
+            let schema =
+                DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, 
&schema)?);
+            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
+                alias: alias.clone(),
+                input: Arc::new(inputs[0].clone()),
+                schema,
+            }))
+        }
+        LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
+            n: *n,
+            input: Arc::new(inputs[0].clone()),
+        })),
+        LogicalPlan::Offset(Offset { offset, .. }) => 
Ok(LogicalPlan::Offset(Offset {
+            offset: *offset,
+            input: Arc::new(inputs[0].clone()),
+        })),
+        LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+            name,
+            if_not_exists,
+            ..
+        }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+            input: Arc::new(inputs[0].clone()),
+            name: name.clone(),
+            if_not_exists: *if_not_exists,
+        })),
+        LogicalPlan::CreateView(CreateView {
+            name, or_replace, ..
+        }) => Ok(LogicalPlan::CreateView(CreateView {
+            input: Arc::new(inputs[0].clone()),
+            name: name.clone(),
+            or_replace: *or_replace,
+        })),
+        LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
+            node: e.node.from_template(expr, inputs),
+        })),
+        LogicalPlan::Union(Union { schema, alias, .. }) => {
+            Ok(LogicalPlan::Union(Union {
+                inputs: inputs.to_vec(),
+                schema: schema.clone(),
+                alias: alias.clone(),
+            }))
+        }
+        LogicalPlan::Analyze(a) => {
+            assert!(expr.is_empty());
+            assert_eq!(inputs.len(), 1);
+            Ok(LogicalPlan::Analyze(Analyze {
+                verbose: a.verbose,
+                schema: a.schema.clone(),
+                input: Arc::new(inputs[0].clone()),
+            }))
+        }
+        LogicalPlan::Explain(_) => {
+            // Explain should be handled specially in the optimizers;
+            // If this assert fails it means some optimizer pass is
+            // trying to optimize Explain directly
+            assert!(
+                expr.is_empty(),
+                "Explain can not be created from utils::from_expr"
+            );
+            assert!(
+                inputs.is_empty(),
+                "Explain can not be created from utils::from_expr"
+            );
+            Ok(plan.clone())
+        }
+        LogicalPlan::EmptyRelation(_)
+        | LogicalPlan::TableScan { .. }
+        | LogicalPlan::CreateExternalTable(_)
+        | LogicalPlan::DropTable(_)
+        | LogicalPlan::CreateCatalogSchema(_)
+        | LogicalPlan::CreateCatalog(_) => {
+            // All of these plan types have no inputs / exprs so should not be 
called
+            assert!(expr.is_empty(), "{:?} should have no exprs", plan);
+            assert!(inputs.is_empty(), "{:?}  should have no inputs", plan);
+            Ok(plan.clone())
+        }
+    }
+}
+
+/// Find all columns referenced from an aggregate query
+fn agg_cols(agg: &Aggregate) -> Result<Vec<Column>> {
+    Ok(agg
+        .aggr_expr
+        .iter()
+        .chain(&agg.group_expr)
+        .flat_map(find_columns_referenced_by_expr)
+        .collect())
+}
+
+fn exprlist_to_fields_aggregate(
+    exprs: &[Expr],
+    plan: &LogicalPlan,
+    agg: &Aggregate,
+) -> Result<Vec<DFField>> {
+    let agg_cols = agg_cols(agg)?;
+    let mut fields = vec![];
+    for expr in exprs {
+        match expr {
+            Expr::Column(c) if agg_cols.iter().any(|x| x == c) => {
+                // resolve against schema of input to aggregate
+                fields.push(expr.to_field(agg.input.schema())?);
+            }
+            _ => fields.push(expr.to_field(plan.schema())?),
+        }
+    }
+    Ok(fields)
+}
+
+/// Create field meta-data from an expression, for use in a result set schema
+pub fn exprlist_to_fields<'a>(
+    expr: impl IntoIterator<Item = &'a Expr>,
+    plan: &LogicalPlan,
+) -> Result<Vec<DFField>> {
+    let exprs: Vec<Expr> = expr.into_iter().cloned().collect();
+    // when dealing with aggregate plans we cannot simply look in the 
aggregate output schema
+    // because it will contain columns representing complex expressions (such 
a column named
+    // `#GROUPING(person.state)` so in order to resolve `person.state` in this 
case we need to
+    // look at the input to the aggregate instead.
+    let fields = match plan {
+        LogicalPlan::Aggregate(agg) => {
+            Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
+        }
+        LogicalPlan::Window(window) => match window.input.as_ref() {
+            LogicalPlan::Aggregate(agg) => {
+                Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
+            }
+            _ => None,
+        },
+        _ => None,
+    };
+    if let Some(fields) = fields {
+        fields
+    } else {
+        // look for exact match in plan's output schema
+        let input_schema = &plan.schema();
+        exprs.iter().map(|e| e.to_field(input_schema)).collect()
+    }
+}
+
+/// Convert an expression into Column expression if it's already provided as 
input plan.
+///
+/// For example, it rewrites:
+///
+/// ```text
+/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+/// .project(vec![col("c1"), sum(col("c2"))?
+/// ```
+///
+/// Into:
+///
+/// ```text
+/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+/// .project(vec![col("c1"), col("SUM(#c2)")?
+/// ```
+pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
+    match e {
+        Expr::Column(_) => e,
+        Expr::Alias(inner_expr, name) => {
+            Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), 
name)
+        }
+        Expr::ScalarSubquery(_) => e.clone(),
+        _ => match e.name(input_schema) {
+            Ok(name) => match input_schema.field_with_unqualified_name(&name) {
+                Ok(field) => Expr::Column(field.qualified_column()),
+                // expression not provided as input, do not convert to a 
column reference
+                Err(_) => e,
+            },
+            Err(_) => e,
+        },
+    }
+}
+
+/// Collect all deeply nested `Expr::Column`'s. They are returned in order of
+/// appearance (depth first), and may contain duplicates.
+pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
+    exprs
+        .iter()
+        .flat_map(find_columns_referenced_by_expr)
+        .map(Expr::Column)
+        .collect()
+}
+
+/// Recursively find all columns referenced by an expression
+#[derive(Debug, Default)]
+struct ColumnCollector {
+    exprs: Vec<Column>,
+}
+
+impl ExpressionVisitor for ColumnCollector {
+    fn pre_visit(mut self, expr: &Expr) -> Result<Recursion<Self>> {
+        if let Expr::Column(c) = expr {
+            self.exprs.push(c.clone())
+        }
+        Ok(Recursion::Continue(self))
+    }
+}
+
+pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
+    // As the `ExpressionVisitor` impl above always returns Ok, this
+    // "can't" error
+    let ColumnCollector { exprs } = e
+        .accept(ColumnCollector::default())
+        .expect("Unexpected error");
+    exprs
+}
+
+/// Convert any `Expr` to an `Expr::Column`.
+pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
+    match expr {
+        Expr::Column(col) => {
+            let field = plan.schema().field_from_column(col)?;
+            Ok(Expr::Column(field.qualified_column()))
+        }
+        _ => {
+            // we should not be trying to create a name for the expression
+            // based on the input schema but this is the current behavior
+            // see https://github.com/apache/arrow-datafusion/issues/2456
+            Ok(Expr::Column(Column::from_name(expr.name(plan.schema())?)))
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to