This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 44d154e62 Move `LogicalPlanBuilder` to `datafusion-expr` crate (#2576)
44d154e62 is described below
commit 44d154e62b8f1574639b61ad9b880034d3393708
Author: Andy Grove <[email protected]>
AuthorDate: Sun May 22 09:34:16 2022 -0600
Move `LogicalPlanBuilder` to `datafusion-expr` crate (#2576)
---
datafusion/core/src/logical_plan/expr.rs | 99 +-----
datafusion/core/src/logical_plan/mod.rs | 26 +-
.../core/src/optimizer/common_subexpr_eliminate.rs | 4 +-
datafusion/core/src/optimizer/eliminate_filter.rs | 4 +-
datafusion/core/src/optimizer/eliminate_limit.rs | 4 +-
datafusion/core/src/optimizer/filter_push_down.rs | 8 +-
datafusion/core/src/optimizer/limit_push_down.rs | 4 +-
.../core/src/optimizer/projection_push_down.rs | 14 +-
.../core/src/optimizer/simplify_expressions.rs | 4 +-
.../src/optimizer/single_distinct_to_groupby.rs | 6 +-
datafusion/core/src/optimizer/utils.rs | 204 +-----------
datafusion/core/src/sql/planner.rs | 10 +-
datafusion/core/src/sql/utils.rs | 57 +---
datafusion/expr/src/lib.rs | 2 +-
.../{core => expr}/src/logical_plan/builder.rs | 89 ++++--
datafusion/expr/src/logical_plan/mod.rs | 2 +
datafusion/expr/src/utils.rs | 347 ++++++++++++++++++++-
17 files changed, 454 insertions(+), 430 deletions(-)
diff --git a/datafusion/core/src/logical_plan/expr.rs
b/datafusion/core/src/logical_plan/expr.rs
index 6d90c78f1..f8d7f46c6 100644
--- a/datafusion/core/src/logical_plan/expr.rs
+++ b/datafusion/core/src/logical_plan/expr.rs
@@ -20,18 +20,14 @@
pub use super::Operator;
use crate::error::Result;
-use crate::logical_plan::ExprSchemable;
-use crate::logical_plan::{DFField, DFSchema};
-use crate::sql::utils::find_columns_referenced_by_expr;
use arrow::datatypes::DataType;
pub use datafusion_common::{Column, ExprSchema};
pub use datafusion_expr::expr_fn::*;
-use datafusion_expr::logical_plan::Aggregate;
+use datafusion_expr::AccumulatorFunctionImplementation;
use datafusion_expr::BuiltinScalarFunction;
pub use datafusion_expr::Expr;
use datafusion_expr::StateTypeFunction;
pub use datafusion_expr::{lit, lit_timestamp_nano, Literal};
-use datafusion_expr::{AccumulatorFunctionImplementation, LogicalPlan};
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_expr::{
ReturnTypeFunction, ScalarFunctionImplementation, Signature, Volatility,
@@ -52,39 +48,6 @@ pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
Some(combined_filter)
}
-/// Convert an expression into Column expression if it's already provided as
input plan.
-///
-/// For example, it rewrites:
-///
-/// ```text
-/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
-/// .project(vec![col("c1"), sum(col("c2"))?
-/// ```
-///
-/// Into:
-///
-/// ```text
-/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
-/// .project(vec![col("c1"), col("SUM(#c2)")?
-/// ```
-pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
- match e {
- Expr::Column(_) => e,
- Expr::Alias(inner_expr, name) => {
- Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)),
name)
- }
- Expr::ScalarSubquery(_) => e.clone(),
- _ => match e.name(input_schema) {
- Ok(name) => match input_schema.field_with_unqualified_name(&name) {
- Ok(field) => Expr::Column(field.qualified_column()),
- // expression not provided as input, do not convert to a
column reference
- Err(_) => e,
- },
- Err(_) => e,
- },
- }
-}
-
/// Recursively un-alias an expressions
#[inline]
pub fn unalias(expr: Expr) -> Expr {
@@ -137,66 +100,6 @@ pub fn create_udaf(
)
}
-/// Find all columns referenced from an aggregate query
-fn agg_cols(agg: &Aggregate) -> Result<Vec<Column>> {
- Ok(agg
- .aggr_expr
- .iter()
- .chain(&agg.group_expr)
- .flat_map(find_columns_referenced_by_expr)
- .collect())
-}
-
-fn exprlist_to_fields_aggregate(
- exprs: &[Expr],
- plan: &LogicalPlan,
- agg: &Aggregate,
-) -> Result<Vec<DFField>> {
- let agg_cols = agg_cols(agg)?;
- let mut fields = vec![];
- for expr in exprs {
- match expr {
- Expr::Column(c) if agg_cols.iter().any(|x| x == c) => {
- // resolve against schema of input to aggregate
- fields.push(expr.to_field(agg.input.schema())?);
- }
- _ => fields.push(expr.to_field(plan.schema())?),
- }
- }
- Ok(fields)
-}
-
-/// Create field meta-data from an expression, for use in a result set schema
-pub fn exprlist_to_fields<'a>(
- expr: impl IntoIterator<Item = &'a Expr>,
- plan: &LogicalPlan,
-) -> Result<Vec<DFField>> {
- let exprs: Vec<Expr> = expr.into_iter().cloned().collect();
- // when dealing with aggregate plans we cannot simply look in the
aggregate output schema
- // because it will contain columns representing complex expressions (such
a column named
- // `#GROUPING(person.state)` so in order to resolve `person.state` in this
case we need to
- // look at the input to the aggregate instead.
- let fields = match plan {
- LogicalPlan::Aggregate(agg) => {
- Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
- }
- LogicalPlan::Window(window) => match window.input.as_ref() {
- LogicalPlan::Aggregate(agg) => {
- Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
- }
- _ => None,
- },
- _ => None,
- };
- if let Some(fields) = fields {
- fields
- } else {
- // look for exact match in plan's output schema
- let input_schema = &plan.schema();
- exprs.iter().map(|e| e.to_field(input_schema)).collect()
- }
-}
-
/// Calls a named built in function
/// ```
/// use datafusion::logical_plan::*;
diff --git a/datafusion/core/src/logical_plan/mod.rs
b/datafusion/core/src/logical_plan/mod.rs
index e9496c8f2..0a9731441 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -21,35 +21,33 @@
//! Logical query plans can then be optimized and executed directly, or
translated into
//! physical query plans and executed.
-pub(crate) mod builder;
mod expr;
mod expr_simplier;
pub mod plan;
mod registry;
pub mod window_frames;
-pub use builder::{
- build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
-};
pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use datafusion_expr::{
expr_fn::binary_expr,
expr_rewriter,
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
+ logical_plan::builder::{
+ build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
+ },
ExprSchemable, Operator,
};
pub use expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii,
asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr,
coalesce, col,
- columnize_expr, combine_filters, concat, concat_expr, concat_ws,
concat_ws_expr, cos,
- count, count_distinct, create_udaf, create_udf, date_part, date_trunc,
digest,
- exists, exp, exprlist_to_fields, floor, in_list, in_subquery, initcap,
left, length,
- lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5,
min,
- not_exists, not_in_subquery, now, now_expr, nullif, octet_length, or,
power, random,
- regexp_match, regexp_replace, repeat, replace, reverse, right, round,
rpad, rtrim,
- scalar_subquery, sha224, sha256, sha384, sha512, signum, sin, split_part,
sqrt,
- starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros,
- to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc,
unalias, upper,
- when, Column, Expr, ExprSchema, Literal,
+ combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
count,
+ count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
exists, exp,
+ floor, in_list, in_subquery, initcap, left, length, lit,
lit_timestamp_nano, ln,
+ log10, log2, lower, lpad, ltrim, max, md5, min, not_exists,
not_in_subquery, now,
+ now_expr, nullif, octet_length, or, power, random, regexp_match,
regexp_replace,
+ repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery,
sha224, sha256,
+ sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
substr, sum, tan,
+ to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds,
translate,
+ trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
};
pub use expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index af0eea663..81183f56d 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -27,9 +27,9 @@ use crate::logical_plan::{
ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
};
use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
use arrow::datatypes::DataType;
use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::from_plan;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -238,7 +238,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
.map(|input_plan| optimize(input_plan, execution_props))
.collect::<Result<Vec<_>>>()?;
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
}
}
diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs
b/datafusion/core/src/optimizer/eliminate_filter.rs
index 800963ef5..fb99a9798 100644
--- a/datafusion/core/src/optimizer/eliminate_filter.rs
+++ b/datafusion/core/src/optimizer/eliminate_filter.rs
@@ -19,6 +19,7 @@
//! This saves time in planning and executing the query.
//! Note that this rule should be applied after simplify expressions optimizer
rule.
use datafusion_common::ScalarValue;
+use datafusion_expr::utils::from_plan;
use datafusion_expr::Expr;
use crate::error::Result;
@@ -26,7 +27,6 @@ use crate::logical_plan::plan::Filter;
use crate::logical_plan::{EmptyRelation, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
-use super::utils;
use crate::execution::context::ExecutionProps;
/// Optimization rule that elimanate the scalar value (true/false) filter with
an [LogicalPlan::EmptyRelation]
@@ -68,7 +68,7 @@ impl OptimizerRule for EliminateFilter {
.map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;
- utils::from_plan(plan, &plan.expressions(), &new_inputs)
+ from_plan(plan, &plan.expressions(), &new_inputs)
}
}
}
diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs
b/datafusion/core/src/optimizer/eliminate_limit.rs
index c1fc2068d..a7acf7ca6 100644
--- a/datafusion/core/src/optimizer/eliminate_limit.rs
+++ b/datafusion/core/src/optimizer/eliminate_limit.rs
@@ -20,8 +20,8 @@
use crate::error::Result;
use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
+use datafusion_expr::utils::from_plan;
-use super::utils;
use crate::execution::context::ExecutionProps;
/// Optimization rule that replaces LIMIT 0 with an
[LogicalPlan::EmptyRelation]
@@ -59,7 +59,7 @@ impl OptimizerRule for EliminateLimit {
.map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
}
}
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs
b/datafusion/core/src/optimizer/filter_push_down.rs
index 7a0383463..45b1fde36 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -24,7 +24,7 @@ use crate::logical_plan::{
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
-use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns};
+use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns, from_plan};
use std::collections::{HashMap, HashSet};
/// Filter Push Down optimizer rule pushes filter clauses down the plan
@@ -90,7 +90,7 @@ fn push_down(state: &State, plan: &LogicalPlan) ->
Result<LogicalPlan> {
.collect::<Result<Vec<_>>>()?;
let expr = plan.expressions();
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
// remove all filters from `filters` that are in `predicate_columns`
@@ -246,7 +246,7 @@ fn optimize_join(
// create a new Join with the new `left` and `right`
let expr = plan.expressions();
- let plan = utils::from_plan(plan, &expr, &[left, right])?;
+ let plan = from_plan(plan, &expr, &[left, right])?;
if to_keep.0.is_empty() {
Ok(plan)
@@ -334,7 +334,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
// optimize inner
let new_input = optimize(input, state)?;
- utils::from_plan(plan, expr, &[new_input])
+ from_plan(plan, expr, &[new_input])
}
LogicalPlan::Aggregate(Aggregate {
aggr_expr, input, ..
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs
b/datafusion/core/src/optimizer/limit_push_down.rs
index a52fd40df..087ac53c8 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -17,7 +17,6 @@
//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
-use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::Projection;
@@ -25,6 +24,7 @@ use crate::logical_plan::{Limit, TableScan};
use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_expr::logical_plan::Offset;
+use datafusion_expr::utils::from_plan;
use std::sync::Arc;
/// Optimization rule that tries pushes down LIMIT n
@@ -171,7 +171,7 @@ fn limit_push_down(
})
.collect::<Result<Vec<_>>>()?;
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
}
}
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs
b/datafusion/core/src/optimizer/projection_push_down.rs
index cf14adcd1..5a127df6d 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -28,10 +28,11 @@ use crate::logical_plan::{
LogicalPlanBuilder, ToDFSchema, Union,
};
use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
use arrow::datatypes::{Field, Schema};
use arrow::error::Result as ArrowResult;
-use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns,
find_sort_exprs};
+use datafusion_expr::utils::{
+ expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan,
+};
use datafusion_expr::Expr;
use std::{
collections::{BTreeSet, HashSet},
@@ -458,7 +459,7 @@ fn optimize_plan(
_execution_props,
)?];
let expr = vec![];
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
@@ -502,7 +503,7 @@ fn optimize_plan(
})
.collect::<Result<Vec<_>>>()?;
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
}
}
@@ -513,12 +514,11 @@ mod tests {
use std::collections::HashMap;
use super::*;
- use crate::logical_plan::{
- col, exprlist_to_fields, lit, max, min, Expr, JoinType,
LogicalPlanBuilder,
- };
+ use crate::logical_plan::{col, lit, max, min, Expr, JoinType,
LogicalPlanBuilder};
use crate::test::*;
use crate::test_util::scan_empty;
use arrow::datatypes::DataType;
+ use datafusion_expr::utils::exprlist_to_fields;
#[test]
fn aggregate_no_group_by() -> Result<()> {
diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs
b/datafusion/core/src/optimizer/simplify_expressions.rs
index 216af223a..8c628906a 100644
--- a/datafusion/core/src/optimizer/simplify_expressions.rs
+++ b/datafusion/core/src/optimizer/simplify_expressions.rs
@@ -25,13 +25,13 @@ use crate::logical_plan::{
LogicalPlan, RewriteRecursion, SimplifyInfo,
};
use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
use crate::physical_plan::planner::create_physical_expr;
use crate::scalar::ScalarValue;
use crate::{error::Result, logical_plan::Operator};
use arrow::array::new_null_array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
+use datafusion_expr::utils::from_plan;
use datafusion_expr::Volatility;
/// Provides simplification information based on schema and properties
@@ -234,7 +234,7 @@ impl OptimizerRule for SimplifyExpressions {
})
.collect::<Result<Vec<_>>>()?;
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
}
diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
index dfbefa63a..65ff56556 100644
--- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
@@ -21,9 +21,9 @@ use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Aggregate, Projection};
use crate::logical_plan::ExprSchemable;
-use crate::logical_plan::{col, columnize_expr, DFSchema, Expr, LogicalPlan};
+use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
-use crate::optimizer::utils;
+use datafusion_expr::utils::{columnize_expr, from_plan};
use hashbrown::HashSet;
use std::sync::Arc;
@@ -155,7 +155,7 @@ fn optimize_children(plan: &LogicalPlan) ->
Result<LogicalPlan> {
.iter()
.map(|plan| optimize(plan))
.collect::<Result<Vec<_>>>()?;
- utils::from_plan(plan, &expr, &new_inputs)
+ from_plan(plan, &expr, &new_inputs)
}
fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
diff --git a/datafusion/core/src/optimizer/utils.rs
b/datafusion/core/src/optimizer/utils.rs
index 96ba969c4..81d4b76a2 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -19,21 +19,14 @@
use super::optimizer::OptimizerRule;
use crate::execution::context::ExecutionProps;
-use datafusion_expr::logical_plan::{
- Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Subquery,
- SubqueryAlias, Window,
-};
+use datafusion_expr::logical_plan::Filter;
use crate::error::{DataFusionError, Result};
-use crate::logical_plan::{
- and, build_join_schema, CreateMemoryTable, CreateView, DFSchemaRef, Expr,
Limit,
- LogicalPlan, LogicalPlanBuilder, Offset, Operator, Partitioning,
Repartition, Union,
- Values,
-};
+use crate::logical_plan::{and, Expr, LogicalPlan, Operator};
use crate::prelude::lit;
use crate::scalar::ScalarValue;
-use datafusion_common::DFSchema;
use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::from_plan;
use std::sync::Arc;
const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
@@ -61,197 +54,6 @@ pub fn optimize_children(
from_plan(plan, &new_exprs, &new_inputs)
}
-/// Returns a new logical plan based on the original one with inputs
-/// and expressions replaced.
-///
-/// The exprs correspond to the same order of expressions returned by
-/// `LogicalPlan::expressions`. This function is used in optimizers in
-/// the following way:
-///
-/// ```text
-/// let new_inputs = optimize_children(..., plan, props);
-///
-/// // get the plans expressions to optimize
-/// let exprs = plan.expressions();
-///
-/// // potentially rewrite plan expressions
-/// let rewritten_exprs = rewrite_exprs(exprs);
-///
-/// // create new plan using rewritten_exprs in same position
-/// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs);
-/// ```
-pub fn from_plan(
- plan: &LogicalPlan,
- expr: &[Expr],
- inputs: &[LogicalPlan],
-) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Projection(Projection { schema, alias, .. }) => {
- Ok(LogicalPlan::Projection(Projection {
- expr: expr.to_vec(),
- input: Arc::new(inputs[0].clone()),
- schema: schema.clone(),
- alias: alias.clone(),
- }))
- }
- LogicalPlan::Values(Values { schema, .. }) =>
Ok(LogicalPlan::Values(Values {
- schema: schema.clone(),
- values: expr
- .chunks_exact(schema.fields().len())
- .map(|s| s.to_vec())
- .collect::<Vec<_>>(),
- })),
- LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter {
- predicate: expr[0].clone(),
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::Repartition(Repartition {
- partitioning_scheme,
- ..
- }) => match partitioning_scheme {
- Partitioning::RoundRobinBatch(n) => {
- Ok(LogicalPlan::Repartition(Repartition {
- partitioning_scheme: Partitioning::RoundRobinBatch(*n),
- input: Arc::new(inputs[0].clone()),
- }))
- }
- Partitioning::Hash(_, n) =>
Ok(LogicalPlan::Repartition(Repartition {
- partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
- input: Arc::new(inputs[0].clone()),
- })),
- },
- LogicalPlan::Window(Window {
- window_expr,
- schema,
- ..
- }) => Ok(LogicalPlan::Window(Window {
- input: Arc::new(inputs[0].clone()),
- window_expr: expr[0..window_expr.len()].to_vec(),
- schema: schema.clone(),
- })),
- LogicalPlan::Aggregate(Aggregate {
- group_expr, schema, ..
- }) => Ok(LogicalPlan::Aggregate(Aggregate {
- group_expr: expr[0..group_expr.len()].to_vec(),
- aggr_expr: expr[group_expr.len()..].to_vec(),
- input: Arc::new(inputs[0].clone()),
- schema: schema.clone(),
- })),
- LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort {
- expr: expr.to_vec(),
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::Join(Join {
- join_type,
- join_constraint,
- on,
- null_equals_null,
- ..
- }) => {
- let schema =
- build_join_schema(inputs[0].schema(), inputs[1].schema(),
join_type)?;
- Ok(LogicalPlan::Join(Join {
- left: Arc::new(inputs[0].clone()),
- right: Arc::new(inputs[1].clone()),
- join_type: *join_type,
- join_constraint: *join_constraint,
- on: on.clone(),
- schema: DFSchemaRef::new(schema),
- null_equals_null: *null_equals_null,
- }))
- }
- LogicalPlan::CrossJoin(_) => {
- let left = inputs[0].clone();
- let right = &inputs[1];
- LogicalPlanBuilder::from(left).cross_join(right)?.build()
- }
- LogicalPlan::Subquery(_) => {
- let subquery =
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
- Ok(LogicalPlan::Subquery(Subquery {
- subquery: Arc::new(subquery),
- }))
- }
- LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
- let schema = inputs[0].schema().as_ref().clone().into();
- let schema =
- DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias,
&schema)?);
- Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
- alias: alias.clone(),
- input: Arc::new(inputs[0].clone()),
- schema,
- }))
- }
- LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
- n: *n,
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::Offset(Offset { offset, .. }) =>
Ok(LogicalPlan::Offset(Offset {
- offset: *offset,
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- name,
- if_not_exists,
- ..
- }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- input: Arc::new(inputs[0].clone()),
- name: name.clone(),
- if_not_exists: *if_not_exists,
- })),
- LogicalPlan::CreateView(CreateView {
- name, or_replace, ..
- }) => Ok(LogicalPlan::CreateView(CreateView {
- input: Arc::new(inputs[0].clone()),
- name: name.clone(),
- or_replace: *or_replace,
- })),
- LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
- node: e.node.from_template(expr, inputs),
- })),
- LogicalPlan::Union(Union { schema, alias, .. }) => {
- Ok(LogicalPlan::Union(Union {
- inputs: inputs.to_vec(),
- schema: schema.clone(),
- alias: alias.clone(),
- }))
- }
- LogicalPlan::Analyze(a) => {
- assert!(expr.is_empty());
- assert_eq!(inputs.len(), 1);
- Ok(LogicalPlan::Analyze(Analyze {
- verbose: a.verbose,
- schema: a.schema.clone(),
- input: Arc::new(inputs[0].clone()),
- }))
- }
- LogicalPlan::Explain(_) => {
- // Explain should be handled specially in the optimizers;
- // If this assert fails it means some optimizer pass is
- // trying to optimize Explain directly
- assert!(
- expr.is_empty(),
- "Explain can not be created from utils::from_expr"
- );
- assert!(
- inputs.is_empty(),
- "Explain can not be created from utils::from_expr"
- );
- Ok(plan.clone())
- }
- LogicalPlan::EmptyRelation(_)
- | LogicalPlan::TableScan { .. }
- | LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::DropTable(_)
- | LogicalPlan::CreateCatalogSchema(_)
- | LogicalPlan::CreateCatalog(_) => {
- // All of these plan types have no inputs / exprs so should not be
called
- assert!(expr.is_empty(), "{:?} should have no exprs", plan);
- assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
- Ok(plan.clone())
- }
- }
-}
-
/// Returns all direct children `Expression`s of `expr`.
/// E.g. if the expression is "(a + 1) + 1", it returns ["a + 1", "1"] (as
Expr objects)
pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
diff --git a/datafusion/core/src/sql/planner.rs
b/datafusion/core/src/sql/planner.rs
index 3d5e8da93..b2dad55f2 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -46,13 +46,15 @@ use crate::{
};
use arrow::datatypes::*;
use datafusion_expr::utils::{
- exprlist_to_columns, find_aggregate_exprs, find_window_exprs,
+ expr_as_column_expr, exprlist_to_columns, find_aggregate_exprs,
find_column_exprs,
+ find_window_exprs,
};
use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
use hashbrown::HashMap;
use datafusion_common::field_not_found;
use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::logical_plan::builder::project_with_alias;
use datafusion_expr::logical_plan::{Filter, Subquery};
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr,
FunctionArg,
@@ -68,12 +70,10 @@ use sqlparser::parser::ParserError::ParserError;
use super::{
parser::DFParser,
utils::{
- check_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
- find_column_exprs, rebase_expr, resolve_aliases_to_exprs,
- resolve_positions_to_exprs,
+ check_columns_satisfy_exprs, extract_aliases, rebase_expr,
+ resolve_aliases_to_exprs, resolve_positions_to_exprs,
},
};
-use crate::logical_plan::builder::project_with_alias;
use crate::logical_plan::plan::{Analyze, Explain};
/// The ContextProvider trait allows the query planner to obtain meta-data
about tables and
diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs
index 0b8e8d3a6..7a2523e03 100644
--- a/datafusion/core/src/sql/utils.rs
+++ b/datafusion/core/src/sql/utils.rs
@@ -20,66 +20,13 @@
use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION};
use sqlparser::ast::Ident;
-use crate::logical_plan::ExprVisitable;
+use crate::error::{DataFusionError, Result};
use crate::logical_plan::{Expr, LogicalPlan};
use crate::scalar::ScalarValue;
-use crate::{
- error::{DataFusionError, Result},
- logical_plan::{Column, ExpressionVisitor, Recursion},
-};
use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
use std::collections::HashMap;
-/// Collect all deeply nested `Expr::Column`'s. They are returned in order of
-/// appearance (depth first), and may contain duplicates.
-pub(crate) fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
- exprs
- .iter()
- .flat_map(find_columns_referenced_by_expr)
- .map(Expr::Column)
- .collect()
-}
-
-/// Recursively find all columns referenced by an expression
-#[derive(Debug, Default)]
-struct ColumnCollector {
- exprs: Vec<Column>,
-}
-
-impl ExpressionVisitor for ColumnCollector {
- fn pre_visit(mut self, expr: &Expr) -> Result<Recursion<Self>> {
- if let Expr::Column(c) = expr {
- self.exprs.push(c.clone())
- }
- Ok(Recursion::Continue(self))
- }
-}
-
-pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
- // As the `ExpressionVisitor` impl above always returns Ok, this
- // "can't" error
- let ColumnCollector { exprs } = e
- .accept(ColumnCollector::default())
- .expect("Unexpected error");
- exprs
-}
-
-/// Convert any `Expr` to an `Expr::Column`.
-pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) ->
Result<Expr> {
- match expr {
- Expr::Column(col) => {
- let field = plan.schema().field_from_column(col)?;
- Ok(Expr::Column(field.qualified_column()))
- }
- _ => {
- // we should not be trying to create a name for the expression
- // based on the input schema but this is the current behavior
- // see https://github.com/apache/arrow-datafusion/issues/2456
- Ok(Expr::Column(Column::from_name(expr.name(plan.schema())?)))
- }
- }
-}
-
/// Make a best-effort attempt at resolving all columns in the expression tree
pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result<Expr>
{
clone_with_replacement(expr, &|nested_expr| {
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 87bb77067..f71243610 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -65,7 +65,7 @@ pub use function::{
StateTypeFunction,
};
pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral};
-pub use logical_plan::{LogicalPlan, PlanVisitor};
+pub use logical_plan::{LogicalPlan, LogicalPlanBuilder, PlanVisitor};
pub use nullif::SUPPORTED_NULLIF_TYPES;
pub use operator::Operator;
pub use signature::{Signature, TypeSignature, Volatility};
diff --git a/datafusion/core/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
similarity index 95%
rename from datafusion/core/src/logical_plan/builder.rs
rename to datafusion/expr/src/logical_plan/builder.rs
index cad1b0a6a..9778e2fe9 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -17,18 +17,25 @@
//! This module provides a builder for creating LogicalPlans
-use crate::error::{DataFusionError, Result};
-use crate::logical_expr::ExprSchemable;
-use crate::logical_plan::plan::{
- Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
- SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
+use crate::expr_rewriter::{normalize_col, normalize_cols,
rewrite_sort_cols_by_aggs};
+use crate::utils::{columnize_expr, exprlist_to_fields, from_plan};
+use crate::{
+ logical_plan::{
+ Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join,
+ JoinConstraint, JoinType, Limit, LogicalPlan, Offset, Partitioning,
PlanType,
+ Projection, Repartition, Sort, SubqueryAlias, TableScan,
ToStringifiedPlan,
+ Union, Values, Window,
+ },
+ utils::{
+ expand_qualified_wildcard, expand_wildcard, expr_to_columns,
+ group_window_expr_by_sort_keys,
+ },
+ Expr, ExprSchemable, TableSource,
};
-use crate::optimizer::utils;
-use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Schema};
-use datafusion_expr::utils::{
- expand_qualified_wildcard, expand_wildcard, expr_to_columns,
- group_window_expr_by_sort_keys,
+use datafusion_common::{
+ Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue,
+ ToDFSchema,
};
use std::convert::TryFrom;
use std::iter;
@@ -37,16 +44,6 @@ use std::{
sync::Arc,
};
-use super::{Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
-use crate::logical_plan::{
- columnize_expr, exprlist_to_fields, normalize_col, normalize_cols,
- rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema,
DFSchemaRef, Limit,
- Offset, Partitioning, Repartition, Values,
-};
-
-use datafusion_common::ToDFSchema;
-use datafusion_expr::TableSource;
-
/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";
@@ -240,8 +237,9 @@ impl LogicalPlanBuilder {
});
Ok(Self::from(table_scan))
}
+
/// Wrap a plan in a window
- pub(crate) fn window_plan(
+ pub fn window_plan(
input: LogicalPlan,
window_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
@@ -368,7 +366,7 @@ impl LogicalPlanBuilder {
.collect::<Result<Vec<_>>>()?;
let expr = curr_plan.expressions();
- utils::from_plan(&curr_plan, &expr, &new_inputs)
+ from_plan(&curr_plan, &expr, &new_inputs)
}
}
}
@@ -704,7 +702,7 @@ impl LogicalPlanBuilder {
}
/// Process intersect set operator
- pub(crate) fn intersect(
+ pub fn intersect(
left_plan: LogicalPlan,
right_plan: LogicalPlan,
is_all: bool,
@@ -718,7 +716,7 @@ impl LogicalPlanBuilder {
}
/// Process except set operator
- pub(crate) fn except(
+ pub fn except(
left_plan: LogicalPlan,
right_plan: LogicalPlan,
is_all: bool,
@@ -938,17 +936,15 @@ pub fn project_with_alias(
#[cfg(test)]
mod tests {
- use arrow::datatypes::{DataType, Field};
+ use crate::expr_fn::exists;
+ use arrow::datatypes::{DataType, Field, SchemaRef};
use datafusion_common::SchemaError;
- use datafusion_expr::expr_fn::exists;
+ use std::any::Any;
use crate::logical_plan::StringifiedPlan;
- use crate::prelude::*;
- use crate::test::test_table_scan_with_name;
- use crate::test_util::scan_empty;
- use super::super::{col, lit, sum};
use super::*;
+ use crate::{col, in_subquery, lit, scalar_subquery, sum};
#[test]
fn plan_builder_simple() -> Result<()> {
@@ -1239,4 +1235,37 @@ mod tests {
assert!(stringified_plan.should_display(true));
assert!(!stringified_plan.should_display(false));
}
+
+ fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::UInt32, false),
+ Field::new("c", DataType::UInt32, false),
+ ]);
+ scan_empty(Some(name), &schema, None)?.build()
+ }
+
+ fn scan_empty(
+ name: Option<&str>,
+ table_schema: &Schema,
+ projection: Option<Vec<usize>>,
+ ) -> Result<LogicalPlanBuilder> {
+ let table_schema = Arc::new(table_schema.clone());
+ let table_source = Arc::new(EmptyTable { table_schema });
+ LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source,
projection)
+ }
+
+ struct EmptyTable {
+ table_schema: SchemaRef,
+ }
+
+ impl TableSource for EmptyTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.table_schema.clone()
+ }
+ }
}
diff --git a/datafusion/expr/src/logical_plan/mod.rs
b/datafusion/expr/src/logical_plan/mod.rs
index e9a90cd7f..f96e4320f 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+pub mod builder;
pub mod display;
mod extension;
mod plan;
+pub use builder::LogicalPlanBuilder;
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation,
Explain,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 709a3eee2..a7aed1799 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -18,9 +18,18 @@
//! Expression utilities
use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
-use crate::{Expr, LogicalPlan};
-use datafusion_common::{Column, DFField, DFSchema, DataFusionError, Result};
+use crate::logical_plan::builder::build_join_schema;
+use crate::logical_plan::{
+ Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter,
Join, Limit,
+ Offset, Partitioning, Projection, Repartition, Sort, Subquery,
SubqueryAlias, Union,
+ Values, Window,
+};
+use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
+use datafusion_common::{
+ Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result,
+};
use std::collections::HashSet;
+use std::sync::Arc;
/// Recursively walk a list of expression trees, collecting the unique set of
columns
/// referenced in the expression
@@ -285,6 +294,340 @@ where
}
}
+/// Returns a new logical plan based on the original one with inputs
+/// and expressions replaced.
+///
+/// The exprs correspond to the same order of expressions returned by
+/// `LogicalPlan::expressions`. This function is used in optimizers in
+/// the following way:
+///
+/// ```text
+/// let new_inputs = optimize_children(..., plan, props);
+///
+/// // get the plans expressions to optimize
+/// let exprs = plan.expressions();
+///
+/// // potentially rewrite plan expressions
+/// let rewritten_exprs = rewrite_exprs(exprs);
+///
+/// // create new plan using rewritten_exprs in same position
+/// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs);
+/// ```
+pub fn from_plan(
+ plan: &LogicalPlan,
+ expr: &[Expr],
+ inputs: &[LogicalPlan],
+) -> Result<LogicalPlan> {
+ match plan {
+ LogicalPlan::Projection(Projection { schema, alias, .. }) => {
+ Ok(LogicalPlan::Projection(Projection {
+ expr: expr.to_vec(),
+ input: Arc::new(inputs[0].clone()),
+ schema: schema.clone(),
+ alias: alias.clone(),
+ }))
+ }
+ LogicalPlan::Values(Values { schema, .. }) =>
Ok(LogicalPlan::Values(Values {
+ schema: schema.clone(),
+ values: expr
+ .chunks_exact(schema.fields().len())
+ .map(|s| s.to_vec())
+ .collect::<Vec<_>>(),
+ })),
+ LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter {
+ predicate: expr[0].clone(),
+ input: Arc::new(inputs[0].clone()),
+ })),
+ LogicalPlan::Repartition(Repartition {
+ partitioning_scheme,
+ ..
+ }) => match partitioning_scheme {
+ Partitioning::RoundRobinBatch(n) => {
+ Ok(LogicalPlan::Repartition(Repartition {
+ partitioning_scheme: Partitioning::RoundRobinBatch(*n),
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ Partitioning::Hash(_, n) =>
Ok(LogicalPlan::Repartition(Repartition {
+ partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
+ input: Arc::new(inputs[0].clone()),
+ })),
+ },
+ LogicalPlan::Window(Window {
+ window_expr,
+ schema,
+ ..
+ }) => Ok(LogicalPlan::Window(Window {
+ input: Arc::new(inputs[0].clone()),
+ window_expr: expr[0..window_expr.len()].to_vec(),
+ schema: schema.clone(),
+ })),
+ LogicalPlan::Aggregate(Aggregate {
+ group_expr, schema, ..
+ }) => Ok(LogicalPlan::Aggregate(Aggregate {
+ group_expr: expr[0..group_expr.len()].to_vec(),
+ aggr_expr: expr[group_expr.len()..].to_vec(),
+ input: Arc::new(inputs[0].clone()),
+ schema: schema.clone(),
+ })),
+ LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort {
+ expr: expr.to_vec(),
+ input: Arc::new(inputs[0].clone()),
+ })),
+ LogicalPlan::Join(Join {
+ join_type,
+ join_constraint,
+ on,
+ null_equals_null,
+ ..
+ }) => {
+ let schema =
+ build_join_schema(inputs[0].schema(), inputs[1].schema(),
join_type)?;
+ Ok(LogicalPlan::Join(Join {
+ left: Arc::new(inputs[0].clone()),
+ right: Arc::new(inputs[1].clone()),
+ join_type: *join_type,
+ join_constraint: *join_constraint,
+ on: on.clone(),
+ schema: DFSchemaRef::new(schema),
+ null_equals_null: *null_equals_null,
+ }))
+ }
+ LogicalPlan::CrossJoin(_) => {
+ let left = inputs[0].clone();
+ let right = &inputs[1];
+ LogicalPlanBuilder::from(left).cross_join(right)?.build()
+ }
+ LogicalPlan::Subquery(_) => {
+ let subquery =
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+ Ok(LogicalPlan::Subquery(Subquery {
+ subquery: Arc::new(subquery),
+ }))
+ }
+ LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
+ let schema = inputs[0].schema().as_ref().clone().into();
+ let schema =
+ DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias,
&schema)?);
+ Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
+ alias: alias.clone(),
+ input: Arc::new(inputs[0].clone()),
+ schema,
+ }))
+ }
+ LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
+ n: *n,
+ input: Arc::new(inputs[0].clone()),
+ })),
+ LogicalPlan::Offset(Offset { offset, .. }) =>
Ok(LogicalPlan::Offset(Offset {
+ offset: *offset,
+ input: Arc::new(inputs[0].clone()),
+ })),
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ name,
+ if_not_exists,
+ ..
+ }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ input: Arc::new(inputs[0].clone()),
+ name: name.clone(),
+ if_not_exists: *if_not_exists,
+ })),
+ LogicalPlan::CreateView(CreateView {
+ name, or_replace, ..
+ }) => Ok(LogicalPlan::CreateView(CreateView {
+ input: Arc::new(inputs[0].clone()),
+ name: name.clone(),
+ or_replace: *or_replace,
+ })),
+ LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
+ node: e.node.from_template(expr, inputs),
+ })),
+ LogicalPlan::Union(Union { schema, alias, .. }) => {
+ Ok(LogicalPlan::Union(Union {
+ inputs: inputs.to_vec(),
+ schema: schema.clone(),
+ alias: alias.clone(),
+ }))
+ }
+ LogicalPlan::Analyze(a) => {
+ assert!(expr.is_empty());
+ assert_eq!(inputs.len(), 1);
+ Ok(LogicalPlan::Analyze(Analyze {
+ verbose: a.verbose,
+ schema: a.schema.clone(),
+ input: Arc::new(inputs[0].clone()),
+ }))
+ }
+ LogicalPlan::Explain(_) => {
+ // Explain should be handled specially in the optimizers;
+ // If this assert fails it means some optimizer pass is
+ // trying to optimize Explain directly
+ assert!(
+ expr.is_empty(),
+ "Explain can not be created from utils::from_expr"
+ );
+ assert!(
+ inputs.is_empty(),
+ "Explain can not be created from utils::from_expr"
+ );
+ Ok(plan.clone())
+ }
+ LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::TableScan { .. }
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::DropTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_) => {
+ // All of these plan types have no inputs / exprs so should not be
called
+ assert!(expr.is_empty(), "{:?} should have no exprs", plan);
+ assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
+ Ok(plan.clone())
+ }
+ }
+}
+
+/// Find all columns referenced from an aggregate query
+fn agg_cols(agg: &Aggregate) -> Result<Vec<Column>> {
+ Ok(agg
+ .aggr_expr
+ .iter()
+ .chain(&agg.group_expr)
+ .flat_map(find_columns_referenced_by_expr)
+ .collect())
+}
+
+fn exprlist_to_fields_aggregate(
+ exprs: &[Expr],
+ plan: &LogicalPlan,
+ agg: &Aggregate,
+) -> Result<Vec<DFField>> {
+ let agg_cols = agg_cols(agg)?;
+ let mut fields = vec![];
+ for expr in exprs {
+ match expr {
+ Expr::Column(c) if agg_cols.iter().any(|x| x == c) => {
+ // resolve against schema of input to aggregate
+ fields.push(expr.to_field(agg.input.schema())?);
+ }
+ _ => fields.push(expr.to_field(plan.schema())?),
+ }
+ }
+ Ok(fields)
+}
+
+/// Create field meta-data from an expression, for use in a result set schema
+pub fn exprlist_to_fields<'a>(
+ expr: impl IntoIterator<Item = &'a Expr>,
+ plan: &LogicalPlan,
+) -> Result<Vec<DFField>> {
+ let exprs: Vec<Expr> = expr.into_iter().cloned().collect();
+ // when dealing with aggregate plans we cannot simply look in the
aggregate output schema
+ // because it will contain columns representing complex expressions (such
a column named
+ // `#GROUPING(person.state)` so in order to resolve `person.state` in this
case we need to
+ // look at the input to the aggregate instead.
+ let fields = match plan {
+ LogicalPlan::Aggregate(agg) => {
+ Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
+ }
+ LogicalPlan::Window(window) => match window.input.as_ref() {
+ LogicalPlan::Aggregate(agg) => {
+ Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
+ }
+ _ => None,
+ },
+ _ => None,
+ };
+ if let Some(fields) = fields {
+ fields
+ } else {
+ // look for exact match in plan's output schema
+ let input_schema = &plan.schema();
+ exprs.iter().map(|e| e.to_field(input_schema)).collect()
+ }
+}
+
+/// Convert an expression into Column expression if it's already provided as
input plan.
+///
+/// For example, it rewrites:
+///
+/// ```text
+/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+/// .project(vec![col("c1"), sum(col("c2"))?
+/// ```
+///
+/// Into:
+///
+/// ```text
+/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+/// .project(vec![col("c1"), col("SUM(#c2)")?
+/// ```
+pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
+ match e {
+ Expr::Column(_) => e,
+ Expr::Alias(inner_expr, name) => {
+ Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)),
name)
+ }
+ Expr::ScalarSubquery(_) => e.clone(),
+ _ => match e.name(input_schema) {
+ Ok(name) => match input_schema.field_with_unqualified_name(&name) {
+ Ok(field) => Expr::Column(field.qualified_column()),
+ // expression not provided as input, do not convert to a
column reference
+ Err(_) => e,
+ },
+ Err(_) => e,
+ },
+ }
+}
+
+/// Collect all deeply nested `Expr::Column`'s. They are returned in order of
+/// appearance (depth first), and may contain duplicates.
+pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
+ exprs
+ .iter()
+ .flat_map(find_columns_referenced_by_expr)
+ .map(Expr::Column)
+ .collect()
+}
+
+/// Recursively find all columns referenced by an expression
+#[derive(Debug, Default)]
+struct ColumnCollector {
+ exprs: Vec<Column>,
+}
+
+impl ExpressionVisitor for ColumnCollector {
+ fn pre_visit(mut self, expr: &Expr) -> Result<Recursion<Self>> {
+ if let Expr::Column(c) = expr {
+ self.exprs.push(c.clone())
+ }
+ Ok(Recursion::Continue(self))
+ }
+}
+
+pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
+ // As the `ExpressionVisitor` impl above always returns Ok, this
+ // "can't" error
+ let ColumnCollector { exprs } = e
+ .accept(ColumnCollector::default())
+ .expect("Unexpected error");
+ exprs
+}
+
+/// Convert any `Expr` to an `Expr::Column`.
+pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
+ match expr {
+ Expr::Column(col) => {
+ let field = plan.schema().field_from_column(col)?;
+ Ok(Expr::Column(field.qualified_column()))
+ }
+ _ => {
+ // we should not be trying to create a name for the expression
+ // based on the input schema but this is the current behavior
+ // see https://github.com/apache/arrow-datafusion/issues/2456
+ Ok(Expr::Column(Column::from_name(expr.name(plan.schema())?)))
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;