This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 034678b67 Add `Projection::try_new` and 
`Projection::try_new_with_schema` (#2900)
034678b67 is described below

commit 034678b67a154c3ece2b34b687e151ca0c5cfa5e
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jul 14 11:01:32 2022 -0600

    Add `Projection::try_new` and `Projection::try_new_with_schema` (#2900)
---
 datafusion/expr/src/logical_plan/builder.rs        | 50 +++++++-------------
 datafusion/expr/src/logical_plan/plan.rs           | 54 +++++++++++++++++++++-
 datafusion/expr/src/utils.rs                       | 12 ++---
 .../optimizer/src/common_subexpr_eliminate.rs      | 24 +++++-----
 datafusion/optimizer/src/limit_push_down.rs        | 12 ++---
 datafusion/optimizer/src/projection_push_down.rs   | 33 +++++--------
 .../optimizer/src/single_distinct_to_groupby.rs    | 12 ++---
 7 files changed, 110 insertions(+), 87 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 05cc842f1..9eb379142 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -335,8 +335,6 @@ impl LogicalPlanBuilder {
                 .iter()
                 .all(|c| input.schema().field_from_column(c).is_ok()) =>
             {
-                let input_schema = input.schema();
-
                 let missing_exprs = missing_cols
                     .iter()
                     .map(|c| normalize_col(Expr::Column(c.clone()), &input))
@@ -344,17 +342,9 @@ impl LogicalPlanBuilder {
 
                 expr.extend(missing_exprs);
 
-                let new_schema = DFSchema::new_with_metadata(
-                    exprlist_to_fields(&expr, &input)?,
-                    input_schema.metadata().clone(),
-                )?;
-
-                Ok(LogicalPlan::Projection(Projection {
-                    expr,
-                    input,
-                    schema: DFSchemaRef::new(new_schema),
-                    alias,
-                }))
+                Ok(LogicalPlan::Projection(Projection::try_new(
+                    expr, input, alias,
+                )?))
             }
             _ => {
                 let new_inputs = curr_plan
@@ -416,17 +406,12 @@ impl LogicalPlanBuilder {
             .iter()
             .map(|f| Expr::Column(f.qualified_column()))
             .collect();
-        let new_schema = DFSchema::new_with_metadata(
-            exprlist_to_fields(&new_expr, &self.plan)?,
-            schema.metadata().clone(),
-        )?;
 
-        Ok(Self::from(LogicalPlan::Projection(Projection {
-            expr: new_expr,
-            input: Arc::new(sort_plan),
-            schema: DFSchemaRef::new(new_schema),
-            alias: None,
-        })))
+        Ok(Self::from(LogicalPlan::Projection(Projection::try_new(
+            new_expr,
+            Arc::new(sort_plan),
+            None,
+        )?)))
     }
 
     /// Apply a union, preserving duplicate rows
@@ -884,12 +869,9 @@ pub fn project_with_column_index_alias(
             x => x.alias(schema.field(i).name()),
         })
         .collect::<Vec<_>>();
-    Ok(LogicalPlan::Projection(Projection {
-        expr: alias_expr,
-        input,
-        schema,
-        alias,
-    }))
+    Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+        alias_expr, input, schema, alias,
+    )?))
 }
 
 /// Union two logical plans with an optional alias.
@@ -983,12 +965,12 @@ pub fn project_with_alias(
         None => input_schema,
     };
 
-    Ok(LogicalPlan::Projection(Projection {
-        expr: projected_expr,
-        input: Arc::new(plan.clone()),
-        schema: DFSchemaRef::new(schema),
+    Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+        projected_expr,
+        Arc::new(plan.clone()),
+        DFSchemaRef::new(schema),
         alias,
-    }))
+    )?))
 }
 
 /// Create a LogicalPlanBuilder representing a scan of a table with the 
provided name and schema.
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index b0c011449..93c18f4b9 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -17,9 +17,10 @@
 
 use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
 use crate::logical_plan::extension::UserDefinedLogicalNode;
+use crate::utils::exprlist_to_fields;
 use crate::{Expr, TableProviderFilterPushDown, TableSource};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_common::{Column, DFSchemaRef, DataFusionError};
+use datafusion_common::{Column, DFSchema, DFSchemaRef, DataFusionError};
 use std::collections::HashSet;
 ///! Logical plan types
 use std::fmt::{self, Debug, Display, Formatter};
@@ -1042,6 +1043,39 @@ pub struct Projection {
     pub alias: Option<String>,
 }
 
+impl Projection {
+    /// Create a new Projection
+    pub fn try_new(
+        expr: Vec<Expr>,
+        input: Arc<LogicalPlan>,
+        alias: Option<String>,
+    ) -> Result<Self, DataFusionError> {
+        let schema = Arc::new(DFSchema::new_with_metadata(
+            exprlist_to_fields(&expr, &input)?,
+            input.schema().metadata().clone(),
+        )?);
+        Self::try_new_with_schema(expr, input, schema, alias)
+    }
+
+    /// Create a new Projection using the specified output schema
+    pub fn try_new_with_schema(
+        expr: Vec<Expr>,
+        input: Arc<LogicalPlan>,
+        schema: DFSchemaRef,
+        alias: Option<String>,
+    ) -> Result<Self, DataFusionError> {
+        if expr.len() != schema.fields().len() {
+            return Err(DataFusionError::Plan(format!("Projection has mismatch 
between number of expressions ({}) and number of fields in schema ({})", 
expr.len(), schema.fields().len())));
+        }
+        Ok(Self {
+            expr,
+            input,
+            schema,
+            alias,
+        })
+    }
+}
+
 /// Aliased subquery
 #[derive(Clone)]
 pub struct SubqueryAlias {
@@ -1409,7 +1443,9 @@ mod tests {
     use crate::logical_plan::table_scan;
     use crate::{col, in_subquery, lit};
     use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::DFSchema;
     use datafusion_common::Result;
+    use std::collections::HashMap;
 
     fn employee_schema() -> Schema {
         Schema::new(vec![
@@ -1727,6 +1763,22 @@ mod tests {
         );
     }
 
+    #[test]
+    fn projection_expr_schema_mismatch() -> Result<()> {
+        let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], 
HashMap::new())?);
+        let p = Projection::try_new_with_schema(
+            vec![col("a")],
+            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: false,
+                schema: empty_schema.clone(),
+            })),
+            empty_schema,
+            None,
+        );
+        assert_eq!("Error during planning: Projection has mismatch between 
number of expressions (1) and number of fields in schema (0)", format!("{}", 
p.err().unwrap()));
+        Ok(())
+    }
+
     fn test_plan() -> LogicalPlan {
         let schema = Schema::new(vec![
             Field::new("id", DataType::Int32, false),
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 3b55c4851..f2f5d6002 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -341,12 +341,12 @@ pub fn from_plan(
 ) -> Result<LogicalPlan> {
     match plan {
         LogicalPlan::Projection(Projection { schema, alias, .. }) => {
-            Ok(LogicalPlan::Projection(Projection {
-                expr: expr.to_vec(),
-                input: Arc::new(inputs[0].clone()),
-                schema: schema.clone(),
-                alias: alias.clone(),
-            }))
+            Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                expr.to_vec(),
+                Arc::new(inputs[0].clone()),
+                schema.clone(),
+                alias.clone(),
+            )?))
         }
         LogicalPlan::Values(Values { schema, .. }) => 
Ok(LogicalPlan::Values(Values {
             schema: schema.clone(),
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index d1e3fea70..3964bee6b 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -107,12 +107,12 @@ fn optimize(
                 optimizer_config,
             )?;
 
-            Ok(LogicalPlan::Projection(Projection {
-                expr: new_expr.pop().unwrap(),
-                input: Arc::new(new_input),
-                schema: schema.clone(),
-                alias: alias.clone(),
-            }))
+            Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                new_expr.pop().unwrap(),
+                Arc::new(new_input),
+                schema.clone(),
+                alias.clone(),
+            )?))
         }
         LogicalPlan::Filter(Filter { predicate, input }) => {
             let schema = plan.schema().as_ref().clone();
@@ -292,12 +292,12 @@ fn build_project_plan(
     let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?;
     schema.merge(input.schema());
 
-    Ok(LogicalPlan::Projection(Projection {
-        expr: project_exprs,
-        input: Arc::new(input),
-        schema: Arc::new(schema),
-        alias: None,
-    }))
+    Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+        project_exprs,
+        Arc::new(input),
+        Arc::new(schema),
+        None,
+    )?))
 }
 
 #[inline]
diff --git a/datafusion/optimizer/src/limit_push_down.rs 
b/datafusion/optimizer/src/limit_push_down.rs
index f92061b41..66cdc144c 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -162,17 +162,17 @@ fn limit_push_down(
             ancestor,
         ) => {
             // Push down limit directly (projection doesn't change number of 
rows)
-            Ok(LogicalPlan::Projection(Projection {
-                expr: expr.clone(),
-                input: Arc::new(limit_push_down(
+            Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                expr.clone(),
+                Arc::new(limit_push_down(
                     _optimizer,
                     ancestor,
                     input.as_ref(),
                     _optimizer_config,
                 )?),
-                schema: schema.clone(),
-                alias: alias.clone(),
-            }))
+                schema.clone(),
+                alias.clone(),
+            )?))
         }
         (
             LogicalPlan::Union(Union {
diff --git a/datafusion/optimizer/src/projection_push_down.rs 
b/datafusion/optimizer/src/projection_push_down.rs
index 1dfbd817f..aa3cdfb42 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -192,14 +192,12 @@ fn optimize_plan(
                 Ok(new_input)
             } else {
                 let metadata = new_input.schema().metadata().clone();
-                Ok(LogicalPlan::Projection(Projection {
-                    expr: new_expr,
-                    input: Arc::new(new_input),
-                    schema: DFSchemaRef::new(DFSchema::new_with_metadata(
-                        new_fields, metadata,
-                    )?),
-                    alias: alias.clone(),
-                }))
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    new_expr,
+                    Arc::new(new_input),
+                    DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, 
metadata)?),
+                    alias.clone(),
+                )?))
             }
         }
         LogicalPlan::Join(Join {
@@ -538,9 +536,7 @@ mod tests {
     use datafusion_expr::{
         col, lit,
         logical_plan::{builder::LogicalPlanBuilder, JoinType},
-        max, min,
-        utils::exprlist_to_fields,
-        Expr,
+        max, min, Expr,
     };
     use std::collections::HashMap;
 
@@ -839,18 +835,11 @@ mod tests {
         // that the Column references are unqualified (e.g. their
         // relation is `None`). PlanBuilder resolves the expressions
         let expr = vec![col("a"), col("b")];
-        let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap();
-        let projected_schema = DFSchema::new_with_metadata(
-            projected_fields,
-            input_schema.metadata().clone(),
-        )
-        .unwrap();
-        let plan = LogicalPlan::Projection(Projection {
+        let plan = LogicalPlan::Projection(Projection::try_new(
             expr,
-            input: Arc::new(table_scan),
-            schema: Arc::new(projected_schema),
-            alias: None,
-        });
+            Arc::new(table_scan),
+            None,
+        )?);
 
         assert_fields_eq(&plan, vec!["a", "b"]);
 
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs 
b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 67ebd4aea..1769314eb 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -141,12 +141,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
                     schema: final_agg_schema,
                 });
 
-                Ok(LogicalPlan::Projection(Projection {
-                    expr: alias_expr,
-                    input: Arc::new(final_agg),
-                    schema: schema.clone(),
-                    alias: None,
-                }))
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    alias_expr,
+                    Arc::new(final_agg),
+                    schema.clone(),
+                    None,
+                )?))
             } else {
                 optimize_children(plan)
             }

Reply via email to