This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 034678b67 Add `Projection::try_new` and
`Projection::try_new_with_schema` (#2900)
034678b67 is described below
commit 034678b67a154c3ece2b34b687e151ca0c5cfa5e
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jul 14 11:01:32 2022 -0600
Add `Projection::try_new` and `Projection::try_new_with_schema` (#2900)
---
datafusion/expr/src/logical_plan/builder.rs | 50 +++++++-------------
datafusion/expr/src/logical_plan/plan.rs | 54 +++++++++++++++++++++-
datafusion/expr/src/utils.rs | 12 ++---
.../optimizer/src/common_subexpr_eliminate.rs | 24 +++++-----
datafusion/optimizer/src/limit_push_down.rs | 12 ++---
datafusion/optimizer/src/projection_push_down.rs | 33 +++++--------
.../optimizer/src/single_distinct_to_groupby.rs | 12 ++---
7 files changed, 110 insertions(+), 87 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 05cc842f1..9eb379142 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -335,8 +335,6 @@ impl LogicalPlanBuilder {
.iter()
.all(|c| input.schema().field_from_column(c).is_ok()) =>
{
- let input_schema = input.schema();
-
let missing_exprs = missing_cols
.iter()
.map(|c| normalize_col(Expr::Column(c.clone()), &input))
@@ -344,17 +342,9 @@ impl LogicalPlanBuilder {
expr.extend(missing_exprs);
- let new_schema = DFSchema::new_with_metadata(
- exprlist_to_fields(&expr, &input)?,
- input_schema.metadata().clone(),
- )?;
-
- Ok(LogicalPlan::Projection(Projection {
- expr,
- input,
- schema: DFSchemaRef::new(new_schema),
- alias,
- }))
+ Ok(LogicalPlan::Projection(Projection::try_new(
+ expr, input, alias,
+ )?))
}
_ => {
let new_inputs = curr_plan
@@ -416,17 +406,12 @@ impl LogicalPlanBuilder {
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect();
- let new_schema = DFSchema::new_with_metadata(
- exprlist_to_fields(&new_expr, &self.plan)?,
- schema.metadata().clone(),
- )?;
- Ok(Self::from(LogicalPlan::Projection(Projection {
- expr: new_expr,
- input: Arc::new(sort_plan),
- schema: DFSchemaRef::new(new_schema),
- alias: None,
- })))
+ Ok(Self::from(LogicalPlan::Projection(Projection::try_new(
+ new_expr,
+ Arc::new(sort_plan),
+ None,
+ )?)))
}
/// Apply a union, preserving duplicate rows
@@ -884,12 +869,9 @@ pub fn project_with_column_index_alias(
x => x.alias(schema.field(i).name()),
})
.collect::<Vec<_>>();
- Ok(LogicalPlan::Projection(Projection {
- expr: alias_expr,
- input,
- schema,
- alias,
- }))
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ alias_expr, input, schema, alias,
+ )?))
}
/// Union two logical plans with an optional alias.
@@ -983,12 +965,12 @@ pub fn project_with_alias(
None => input_schema,
};
- Ok(LogicalPlan::Projection(Projection {
- expr: projected_expr,
- input: Arc::new(plan.clone()),
- schema: DFSchemaRef::new(schema),
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ projected_expr,
+ Arc::new(plan.clone()),
+ DFSchemaRef::new(schema),
alias,
- }))
+ )?))
}
/// Create a LogicalPlanBuilder representing a scan of a table with the
provided name and schema.
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index b0c011449..93c18f4b9 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -17,9 +17,10 @@
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
+use crate::utils::exprlist_to_fields;
use crate::{Expr, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_common::{Column, DFSchemaRef, DataFusionError};
+use datafusion_common::{Column, DFSchema, DFSchemaRef, DataFusionError};
use std::collections::HashSet;
///! Logical plan types
use std::fmt::{self, Debug, Display, Formatter};
@@ -1042,6 +1043,39 @@ pub struct Projection {
pub alias: Option<String>,
}
+impl Projection {
+ /// Create a new Projection
+ pub fn try_new(
+ expr: Vec<Expr>,
+ input: Arc<LogicalPlan>,
+ alias: Option<String>,
+ ) -> Result<Self, DataFusionError> {
+ let schema = Arc::new(DFSchema::new_with_metadata(
+ exprlist_to_fields(&expr, &input)?,
+ input.schema().metadata().clone(),
+ )?);
+ Self::try_new_with_schema(expr, input, schema, alias)
+ }
+
+ /// Create a new Projection using the specified output schema
+ pub fn try_new_with_schema(
+ expr: Vec<Expr>,
+ input: Arc<LogicalPlan>,
+ schema: DFSchemaRef,
+ alias: Option<String>,
+ ) -> Result<Self, DataFusionError> {
+ if expr.len() != schema.fields().len() {
+ return Err(DataFusionError::Plan(format!("Projection has mismatch
between number of expressions ({}) and number of fields in schema ({})",
expr.len(), schema.fields().len())));
+ }
+ Ok(Self {
+ expr,
+ input,
+ schema,
+ alias,
+ })
+ }
+}
+
/// Aliased subquery
#[derive(Clone)]
pub struct SubqueryAlias {
@@ -1409,7 +1443,9 @@ mod tests {
use crate::logical_plan::table_scan;
use crate::{col, in_subquery, lit};
use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_common::DFSchema;
use datafusion_common::Result;
+ use std::collections::HashMap;
fn employee_schema() -> Schema {
Schema::new(vec![
@@ -1727,6 +1763,22 @@ mod tests {
);
}
+ #[test]
+ fn projection_expr_schema_mismatch() -> Result<()> {
+ let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![],
HashMap::new())?);
+ let p = Projection::try_new_with_schema(
+ vec![col("a")],
+ Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: empty_schema.clone(),
+ })),
+ empty_schema,
+ None,
+ );
+ assert_eq!("Error during planning: Projection has mismatch between
number of expressions (1) and number of fields in schema (0)", format!("{}",
p.err().unwrap()));
+ Ok(())
+ }
+
fn test_plan() -> LogicalPlan {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 3b55c4851..f2f5d6002 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -341,12 +341,12 @@ pub fn from_plan(
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Projection(Projection { schema, alias, .. }) => {
- Ok(LogicalPlan::Projection(Projection {
- expr: expr.to_vec(),
- input: Arc::new(inputs[0].clone()),
- schema: schema.clone(),
- alias: alias.clone(),
- }))
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ expr.to_vec(),
+ Arc::new(inputs[0].clone()),
+ schema.clone(),
+ alias.clone(),
+ )?))
}
LogicalPlan::Values(Values { schema, .. }) =>
Ok(LogicalPlan::Values(Values {
schema: schema.clone(),
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index d1e3fea70..3964bee6b 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -107,12 +107,12 @@ fn optimize(
optimizer_config,
)?;
- Ok(LogicalPlan::Projection(Projection {
- expr: new_expr.pop().unwrap(),
- input: Arc::new(new_input),
- schema: schema.clone(),
- alias: alias.clone(),
- }))
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ new_expr.pop().unwrap(),
+ Arc::new(new_input),
+ schema.clone(),
+ alias.clone(),
+ )?))
}
LogicalPlan::Filter(Filter { predicate, input }) => {
let schema = plan.schema().as_ref().clone();
@@ -292,12 +292,12 @@ fn build_project_plan(
let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?;
schema.merge(input.schema());
- Ok(LogicalPlan::Projection(Projection {
- expr: project_exprs,
- input: Arc::new(input),
- schema: Arc::new(schema),
- alias: None,
- }))
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ project_exprs,
+ Arc::new(input),
+ Arc::new(schema),
+ None,
+ )?))
}
#[inline]
diff --git a/datafusion/optimizer/src/limit_push_down.rs
b/datafusion/optimizer/src/limit_push_down.rs
index f92061b41..66cdc144c 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -162,17 +162,17 @@ fn limit_push_down(
ancestor,
) => {
// Push down limit directly (projection doesn't change number of
rows)
- Ok(LogicalPlan::Projection(Projection {
- expr: expr.clone(),
- input: Arc::new(limit_push_down(
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ expr.clone(),
+ Arc::new(limit_push_down(
_optimizer,
ancestor,
input.as_ref(),
_optimizer_config,
)?),
- schema: schema.clone(),
- alias: alias.clone(),
- }))
+ schema.clone(),
+ alias.clone(),
+ )?))
}
(
LogicalPlan::Union(Union {
diff --git a/datafusion/optimizer/src/projection_push_down.rs
b/datafusion/optimizer/src/projection_push_down.rs
index 1dfbd817f..aa3cdfb42 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -192,14 +192,12 @@ fn optimize_plan(
Ok(new_input)
} else {
let metadata = new_input.schema().metadata().clone();
- Ok(LogicalPlan::Projection(Projection {
- expr: new_expr,
- input: Arc::new(new_input),
- schema: DFSchemaRef::new(DFSchema::new_with_metadata(
- new_fields, metadata,
- )?),
- alias: alias.clone(),
- }))
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ new_expr,
+ Arc::new(new_input),
+ DFSchemaRef::new(DFSchema::new_with_metadata(new_fields,
metadata)?),
+ alias.clone(),
+ )?))
}
}
LogicalPlan::Join(Join {
@@ -538,9 +536,7 @@ mod tests {
use datafusion_expr::{
col, lit,
logical_plan::{builder::LogicalPlanBuilder, JoinType},
- max, min,
- utils::exprlist_to_fields,
- Expr,
+ max, min, Expr,
};
use std::collections::HashMap;
@@ -839,18 +835,11 @@ mod tests {
// that the Column references are unqualified (e.g. their
// relation is `None`). PlanBuilder resolves the expressions
let expr = vec![col("a"), col("b")];
- let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap();
- let projected_schema = DFSchema::new_with_metadata(
- projected_fields,
- input_schema.metadata().clone(),
- )
- .unwrap();
- let plan = LogicalPlan::Projection(Projection {
+ let plan = LogicalPlan::Projection(Projection::try_new(
expr,
- input: Arc::new(table_scan),
- schema: Arc::new(projected_schema),
- alias: None,
- });
+ Arc::new(table_scan),
+ None,
+ )?);
assert_fields_eq(&plan, vec!["a", "b"]);
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs
b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 67ebd4aea..1769314eb 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -141,12 +141,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
schema: final_agg_schema,
});
- Ok(LogicalPlan::Projection(Projection {
- expr: alias_expr,
- input: Arc::new(final_agg),
- schema: schema.clone(),
- alias: None,
- }))
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ alias_expr,
+ Arc::new(final_agg),
+ schema.clone(),
+ None,
+ )?))
} else {
optimize_children(plan)
}