alamb commented on code in PR #4862:
URL: https://github.com/apache/arrow-datafusion/pull/4862#discussion_r1067491268


##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
     plan: &LogicalPlan,
     schema: &DFSchema,
 ) -> Result<LogicalPlan> {
-    let new_expr = plan
-        .expressions()
-        .into_iter()
+    match plan {
+        LogicalPlan::Projection(Projection { expr, input, .. }) => {
+            let new_exprs = coerce_exprs_for_schema(expr, input.schema(), 
schema)?;
+            let projection = Projection::try_new(new_exprs, input.clone())?;
+            Ok(LogicalPlan::Projection(projection))
+        }
+        _ => {
+            let exprs: Vec<Expr> = plan
+                .schema()
+                .fields()
+                .iter()
+                .map(|field| Expr::Column(field.qualified_column()))
+                .collect();
+
+            let new_exprs = coerce_exprs_for_schema(&exprs, plan.schema(), 
schema)?;
+            let add_project = new_exprs.iter().any(|expr| 
expr.try_into_col().is_err());
+            if add_project {
+                let projection = Projection::try_new(new_exprs, 
Arc::new(plan.clone()))?;
+                Ok(LogicalPlan::Projection(projection))
+            } else {
+                Ok(plan.clone())
+            }
+        }
+    }
+}
+
+fn coerce_exprs_for_schema(
+    exprs: &[Expr],
+    src_schema: &DFSchema,
+    dst_schema: &DFSchema,
+) -> Result<Vec<Expr>> {
+    exprs
+        .iter()
         .enumerate()
-        .map(|(i, expr)| {
-            let new_type = schema.field(i).data_type();
-            if plan.schema().field(i).data_type() != 
schema.field(i).data_type() {
-                match (plan, &expr) {
-                    (
-                        LogicalPlan::Projection(Projection { input, .. }),
-                        Expr::Alias(e, alias),
-                    ) => Ok(e.clone().cast_to(new_type, 
input.schema())?.alias(alias)),
-                    _ => expr.cast_to(new_type, plan.schema()),
+        .map(|(idx, expr)| {
+            let new_type = dst_schema.field(idx).data_type();
+            if new_type != &expr.get_type(src_schema)? {
+                match expr {
+                    Expr::Alias(e, alias) => {
+                        let new_expr = e.clone().cast_to(new_type, 
src_schema)?;
+                        Ok(Expr::Alias(Box::new(new_expr), alias.clone()))
+                    }

Review Comment:
   I think you can write this more concisely like:
   
   ```suggestion
                       Expr::Alias(e, alias) => {
                           Ok(e.clone()
                               .cast_to(new_type, src_schema)?
                               .alias(alias))
                       }
   ```



##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
     plan: &LogicalPlan,

Review Comment:
   I was somewhat confused about this at first because the name of this 
function 
   
   it turns out it is only called for coercing union schemas: 
https://github.com/search?q=repo%3Aapache%2Farrow-datafusion%20coerce_plan_expr_for_schema&type=code
   
   👍 
   



##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
     plan: &LogicalPlan,
     schema: &DFSchema,
 ) -> Result<LogicalPlan> {
-    let new_expr = plan
-        .expressions()
-        .into_iter()
+    match plan {
+        LogicalPlan::Projection(Projection { expr, input, .. }) => {
+            let new_exprs = coerce_exprs_for_schema(expr, input.schema(), 
schema)?;
+            let projection = Projection::try_new(new_exprs, input.clone())?;
+            Ok(LogicalPlan::Projection(projection))
+        }
+        _ => {
+            let exprs: Vec<Expr> = plan
+                .schema()
+                .fields()
+                .iter()
+                .map(|field| Expr::Column(field.qualified_column()))
+                .collect();
+
+            let new_exprs = coerce_exprs_for_schema(&exprs, plan.schema(), 
schema)?;
+            let add_project = new_exprs.iter().any(|expr| 
expr.try_into_col().is_err());
+            if add_project {
+                let projection = Projection::try_new(new_exprs, 
Arc::new(plan.clone()))?;
+                Ok(LogicalPlan::Projection(projection))
+            } else {
+                Ok(plan.clone())
+            }
+        }
+    }
+}
+
+fn coerce_exprs_for_schema(
+    exprs: &[Expr],

Review Comment:
   ```suggestion
       exprs: Vec<Expr>,
   ```
   
   Since the caller owns the Exprs already, I think you can avoid the clones in 
this function  by passing in the Vec directly



##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
     plan: &LogicalPlan,
     schema: &DFSchema,
 ) -> Result<LogicalPlan> {
-    let new_expr = plan
-        .expressions()
-        .into_iter()
+    match plan {
+        LogicalPlan::Projection(Projection { expr, input, .. }) => {

Review Comment:
   ```suggestion
           // special case Projection to avoid adding multiple projections
           LogicalPlan::Projection(Projection { expr, input, .. }) => {
   ```
   
   I didn't understand the need to special case `LogicalPlan::projection` -- I 
think it would help to explain the rationale 
   
   
   For anyone who is curious I tried removing this case and got errors like the 
following:
   
   ```
   
   
   
   ---- sorted_union_with_different_types_and_group_by stdout ----
   thread 'sorted_union_with_different_types_and_group_by' panicked at 
'assertion failed: `(left == right)`
     left: `"Sort: a ASC NULLS LAST\n  Union\n    Projection: CAST(x.a AS 
Float64) AS a\n      Projection: x.a\n        Aggregate: groupBy=[[x.a]], 
aggr=[[]]\n          SubqueryAlias: x\n            Projection: Int64(1) AS a\n  
            EmptyRelation\n    Projection: x.a\n      Aggregate: 
groupBy=[[x.a]], aggr=[[]]\n        SubqueryAlias: x\n          Projection: 
Float64(1.1) AS a\n            EmptyRelation"`,
    right: `"Sort: a ASC NULLS LAST\n  Union\n    Projection: CAST(x.a AS 
Float64) AS a\n      Aggregate: groupBy=[[x.a]], aggr=[[]]\n        
SubqueryAlias: x\n          Projection: Int64(1) AS a\n            
EmptyRelation\n    Projection: x.a\n      Aggregate: groupBy=[[x.a]], 
aggr=[[]]\n        SubqueryAlias: x\n          Projection: Float64(1.1) AS a\n  
          EmptyRelation"`', datafusion/sql/tests/integration_test.rs:2175:5
   stack backtrace:
      0: rust_begin_unwind
                at 
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/std/src/panicking.rs:575:5
      1: core::panicking::panic_fmt
                at 
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:65:14
      2: core::panicking::assert_failed_inner
                at 
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:246:17
      3: core::panicking::assert_failed
                at 
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:203:5
      4: integration_test::quick_test
                at ./tests/integration_test.rs:2175:5
      5: integration_test::sorted_union_with_different_types_and_group_by
                at ./tests/integration_test.rs:1714:5
      6: 
integration_test::sorted_union_with_different_types_and_group_by::{{closure}}
                at ./tests/integration_test.rs:1700:1
      7: core::ops::function::FnOnce::call_once
                at 
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/ops/function.rs:251:5
      8: core::ops::function::FnOnce::call_once
                at 
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/ops/function.rs:251:5
   note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose 
backtrace.
   ```
   
   Which are due to another projection having been addeed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to