This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 137bf81a39 revert #6595 #6820 (#6827)
137bf81a39 is described below
commit 137bf81a39d4e4279a79f31aadee5bd75612017a
Author: jakevin <[email protected]>
AuthorDate: Tue Jul 4 14:59:04 2023 +0800
revert #6595 #6820 (#6827)
* revert: from_plan keep same schema Project in #6595
* revert: from_plan keep same schema Agg/Window in #6820
* revert type coercion
* add comment
---
datafusion/common/src/dfschema.rs | 8 +---
datafusion/expr/src/utils.rs | 44 +++++++++++++---------
datafusion/optimizer/src/analyzer/type_coercion.rs | 11 +++++-
3 files changed, 38 insertions(+), 25 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index c490852c6e..cb07f15b9d 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -384,12 +384,8 @@ impl DFSchema {
let self_fields = self.fields().iter();
let other_fields = other.fields().iter();
self_fields.zip(other_fields).all(|(f1, f2)| {
- // TODO: resolve field when exist alias
- // f1.qualifier() == f2.qualifier()
- // && f1.name() == f2.name()
- // column(t1.a) field is "t1"."a"
- // column(x) as t1.a field is ""."t1.a"
- f1.qualified_name() == f2.qualified_name()
+ f1.qualifier() == f2.qualifier()
+ && f1.name() == f2.name()
&& Self::datatype_is_semantically_equal(f1.data_type(),
f2.data_type())
})
}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 069ce6df71..3111579246 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -724,16 +724,22 @@ where
/// // create new plan using rewritten_exprs in same position
/// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs);
/// ```
+///
+/// Notice: sometimes [from_plan] will use schema of original plan, it don't
change schema!
+/// Such as `Projection/Aggregate/Window`
pub fn from_plan(
plan: &LogicalPlan,
expr: &[Expr],
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
match plan {
- LogicalPlan::Projection(_) =>
Ok(LogicalPlan::Projection(Projection::try_new(
- expr.to_vec(),
- Arc::new(inputs[0].clone()),
- )?)),
+ LogicalPlan::Projection(Projection { schema, .. }) => {
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ expr.to_vec(),
+ Arc::new(inputs[0].clone()),
+ schema.clone(),
+ )?))
+ }
LogicalPlan::Dml(DmlStatement {
table_name,
table_schema,
@@ -818,19 +824,23 @@ pub fn from_plan(
input: Arc::new(inputs[0].clone()),
})),
},
- LogicalPlan::Window(Window { window_expr, .. }) => {
- Ok(LogicalPlan::Window(Window::try_new(
- expr[0..window_expr.len()].to_vec(),
- Arc::new(inputs[0].clone()),
- )?))
- }
- LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
- Ok(LogicalPlan::Aggregate(Aggregate::try_new(
- Arc::new(inputs[0].clone()),
- expr[0..group_expr.len()].to_vec(),
- expr[group_expr.len()..].to_vec(),
- )?))
- }
+ LogicalPlan::Window(Window {
+ window_expr,
+ schema,
+ ..
+ }) => Ok(LogicalPlan::Window(Window {
+ input: Arc::new(inputs[0].clone()),
+ window_expr: expr[0..window_expr.len()].to_vec(),
+ schema: schema.clone(),
+ })),
+ LogicalPlan::Aggregate(Aggregate {
+ group_expr, schema, ..
+ }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+ Arc::new(inputs[0].clone()),
+ expr[0..group_expr.len()].to_vec(),
+ expr[group_expr.len()..].to_vec(),
+ schema.clone(),
+ )?)),
LogicalPlan::Sort(SortPlan { fetch, .. }) =>
Ok(LogicalPlan::Sort(SortPlan {
expr: expr.to_vec(),
input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 8edf734b47..5d1fef5352 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -43,7 +43,7 @@ use datafusion_expr::utils::from_plan;
use datafusion_expr::{
is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
type_coercion, AggregateFunction, BuiltinScalarFunction, Expr,
LogicalPlan, Operator,
- WindowFrame, WindowFrameBound, WindowFrameUnits,
+ Projection, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion_expr::{ExprSchemable, Signature};
@@ -109,7 +109,14 @@ fn analyze_internal(
})
.collect::<Result<Vec<_>>>()?;
- from_plan(plan, &new_expr, &new_inputs)
+ // TODO: from_plan can't change the schema, so we need to do this here
+ match &plan {
+ LogicalPlan::Projection(_) =>
Ok(LogicalPlan::Projection(Projection::try_new(
+ new_expr,
+ Arc::new(new_inputs[0].clone()),
+ )?)),
+ _ => from_plan(plan, &new_expr, &new_inputs),
+ }
}
pub(crate) struct TypeCoercionRewriter {