alamb commented on code in PR #4862:
URL: https://github.com/apache/arrow-datafusion/pull/4862#discussion_r1067491268
##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
plan: &LogicalPlan,
schema: &DFSchema,
) -> Result<LogicalPlan> {
- let new_expr = plan
- .expressions()
- .into_iter()
+ match plan {
+ LogicalPlan::Projection(Projection { expr, input, .. }) => {
+ let new_exprs = coerce_exprs_for_schema(expr, input.schema(),
schema)?;
+ let projection = Projection::try_new(new_exprs, input.clone())?;
+ Ok(LogicalPlan::Projection(projection))
+ }
+ _ => {
+ let exprs: Vec<Expr> = plan
+ .schema()
+ .fields()
+ .iter()
+ .map(|field| Expr::Column(field.qualified_column()))
+ .collect();
+
+ let new_exprs = coerce_exprs_for_schema(&exprs, plan.schema(),
schema)?;
+ let add_project = new_exprs.iter().any(|expr|
expr.try_into_col().is_err());
+ if add_project {
+ let projection = Projection::try_new(new_exprs,
Arc::new(plan.clone()))?;
+ Ok(LogicalPlan::Projection(projection))
+ } else {
+ Ok(plan.clone())
+ }
+ }
+ }
+}
+
+fn coerce_exprs_for_schema(
+ exprs: &[Expr],
+ src_schema: &DFSchema,
+ dst_schema: &DFSchema,
+) -> Result<Vec<Expr>> {
+ exprs
+ .iter()
.enumerate()
- .map(|(i, expr)| {
- let new_type = schema.field(i).data_type();
- if plan.schema().field(i).data_type() !=
schema.field(i).data_type() {
- match (plan, &expr) {
- (
- LogicalPlan::Projection(Projection { input, .. }),
- Expr::Alias(e, alias),
- ) => Ok(e.clone().cast_to(new_type,
input.schema())?.alias(alias)),
- _ => expr.cast_to(new_type, plan.schema()),
+ .map(|(idx, expr)| {
+ let new_type = dst_schema.field(idx).data_type();
+ if new_type != &expr.get_type(src_schema)? {
+ match expr {
+ Expr::Alias(e, alias) => {
+ let new_expr = e.clone().cast_to(new_type,
src_schema)?;
+ Ok(Expr::Alias(Box::new(new_expr), alias.clone()))
+ }
Review Comment:
I think you can write this more concisely like:
```suggestion
Expr::Alias(e, alias) => {
Ok(e.clone()
.cast_to(new_type, src_schema)?
.alias(alias))
}
```
##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
plan: &LogicalPlan,
Review Comment:
I was somewhat confused about this at first because the name of this
function
it turns out it is only called for coercing union schemas:
https://github.com/search?q=repo%3Aapache%2Farrow-datafusion%20coerce_plan_expr_for_schema&type=code
👍
##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
plan: &LogicalPlan,
schema: &DFSchema,
) -> Result<LogicalPlan> {
- let new_expr = plan
- .expressions()
- .into_iter()
+ match plan {
+ LogicalPlan::Projection(Projection { expr, input, .. }) => {
+ let new_exprs = coerce_exprs_for_schema(expr, input.schema(),
schema)?;
+ let projection = Projection::try_new(new_exprs, input.clone())?;
+ Ok(LogicalPlan::Projection(projection))
+ }
+ _ => {
+ let exprs: Vec<Expr> = plan
+ .schema()
+ .fields()
+ .iter()
+ .map(|field| Expr::Column(field.qualified_column()))
+ .collect();
+
+ let new_exprs = coerce_exprs_for_schema(&exprs, plan.schema(),
schema)?;
+ let add_project = new_exprs.iter().any(|expr|
expr.try_into_col().is_err());
+ if add_project {
+ let projection = Projection::try_new(new_exprs,
Arc::new(plan.clone()))?;
+ Ok(LogicalPlan::Projection(projection))
+ } else {
+ Ok(plan.clone())
+ }
+ }
+ }
+}
+
+fn coerce_exprs_for_schema(
+ exprs: &[Expr],
Review Comment:
```suggestion
exprs: Vec<Expr>,
```
Since the caller owns the Exprs already, I think you can avoid the clones in
this function by passing in the Vec directly
##########
datafusion/expr/src/expr_rewriter.rs:
##########
@@ -525,29 +525,55 @@ pub fn coerce_plan_expr_for_schema(
plan: &LogicalPlan,
schema: &DFSchema,
) -> Result<LogicalPlan> {
- let new_expr = plan
- .expressions()
- .into_iter()
+ match plan {
+ LogicalPlan::Projection(Projection { expr, input, .. }) => {
Review Comment:
```suggestion
// special case Projection to avoid adding multiple projections
LogicalPlan::Projection(Projection { expr, input, .. }) => {
```
I didn't understand the need to special case `LogicalPlan::projection` -- I
think it would help to explain the rationale
For anyone who is curious I tried removing this case and got errors like the
following:
```
---- sorted_union_with_different_types_and_group_by stdout ----
thread 'sorted_union_with_different_types_and_group_by' panicked at
'assertion failed: `(left == right)`
left: `"Sort: a ASC NULLS LAST\n Union\n Projection: CAST(x.a AS
Float64) AS a\n Projection: x.a\n Aggregate: groupBy=[[x.a]],
aggr=[[]]\n SubqueryAlias: x\n Projection: Int64(1) AS a\n
EmptyRelation\n Projection: x.a\n Aggregate:
groupBy=[[x.a]], aggr=[[]]\n SubqueryAlias: x\n Projection:
Float64(1.1) AS a\n EmptyRelation"`,
right: `"Sort: a ASC NULLS LAST\n Union\n Projection: CAST(x.a AS
Float64) AS a\n Aggregate: groupBy=[[x.a]], aggr=[[]]\n
SubqueryAlias: x\n Projection: Int64(1) AS a\n
EmptyRelation\n Projection: x.a\n Aggregate: groupBy=[[x.a]],
aggr=[[]]\n SubqueryAlias: x\n Projection: Float64(1.1) AS a\n
EmptyRelation"`', datafusion/sql/tests/integration_test.rs:2175:5
stack backtrace:
0: rust_begin_unwind
at
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/std/src/panicking.rs:575:5
1: core::panicking::panic_fmt
at
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:65:14
2: core::panicking::assert_failed_inner
at
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:246:17
3: core::panicking::assert_failed
at
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/panicking.rs:203:5
4: integration_test::quick_test
at ./tests/integration_test.rs:2175:5
5: integration_test::sorted_union_with_different_types_and_group_by
at ./tests/integration_test.rs:1714:5
6:
integration_test::sorted_union_with_different_types_and_group_by::{{closure}}
at ./tests/integration_test.rs:1700:1
7: core::ops::function::FnOnce::call_once
at
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/ops/function.rs:251:5
8: core::ops::function::FnOnce::call_once
at
/rustc/69f9c33d71c871fc16ac445211281c6e7a340943/library/core/src/ops/function.rs:251:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose
backtrace.
```
Which are due to another projection having been addeed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]