This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d37351aa61 fix: from_plan generate Agg/Window can be with different
schema. (#6820)
d37351aa61 is described below
commit d37351aa610c2c68e4cc2a57a5c89d6f0cfcedf5
Author: jakevin <[email protected]>
AuthorDate: Sun Jul 2 21:08:11 2023 +0800
fix: from_plan generate Agg/Window can be with different schema. (#6820)
* fix: from_plan generate Agg can be with different schema.
* fix: from_plan generate Window can be with different schema.
---
datafusion/expr/src/logical_plan/builder.rs | 15 +++++----------
datafusion/expr/src/logical_plan/plan.rs | 20 +++++++++++++++++--
datafusion/expr/src/utils.rs | 30 +++++++++++++----------------
3 files changed, 36 insertions(+), 29 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 61d540049c..9ddf6231c5 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -825,17 +825,11 @@ impl LogicalPlanBuilder {
window_expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
let window_expr = normalize_cols(window_expr, &self.plan)?;
- let all_expr = window_expr.iter();
- validate_unique_names("Windows", all_expr.clone())?;
- let mut window_fields: Vec<DFField> =
self.plan.schema().fields().clone();
- window_fields.extend_from_slice(&exprlist_to_fields(all_expr,
&self.plan)?);
- let metadata = self.plan.schema().metadata().clone();
-
- Ok(Self::from(LogicalPlan::Window(Window {
- input: Arc::new(self.plan),
+ validate_unique_names("Windows", &window_expr)?;
+ Ok(Self::from(LogicalPlan::Window(Window::try_new(
window_expr,
- schema: Arc::new(DFSchema::new_with_metadata(window_fields,
metadata)?),
- })))
+ Arc::new(self.plan),
+ )?)))
}
/// Apply an aggregate: grouping on the `group_expr` expressions
@@ -1229,6 +1223,7 @@ pub fn project(
plan: LogicalPlan,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<LogicalPlan> {
+ // TODO: move it into analyzer
let input_schema = plan.schema();
let mut projected_expr = vec![];
for e in expr {
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index cf94052252..e058708701 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -35,8 +35,8 @@ use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeVisitor, VisitRecursion,
};
use datafusion_common::{
- plan_err, Column, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference,
- Result, ScalarValue,
+ plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
+ OwnedTableReference, Result, ScalarValue,
};
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
@@ -1400,6 +1400,22 @@ pub struct Window {
pub schema: DFSchemaRef,
}
+impl Window {
+ /// Create a new window operator.
+ pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) ->
Result<Self> {
+ let mut window_fields: Vec<DFField> = input.schema().fields().clone();
+ window_fields
+ .extend_from_slice(&exprlist_to_fields(window_expr.iter(),
input.as_ref())?);
+ let metadata = input.schema().metadata().clone();
+
+ Ok(Window {
+ input,
+ window_expr,
+ schema: Arc::new(DFSchema::new_with_metadata(window_fields,
metadata)?),
+ })
+ }
+}
+
/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScan {
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 61b0db53fb..069ce6df71 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -818,23 +818,19 @@ pub fn from_plan(
input: Arc::new(inputs[0].clone()),
})),
},
- LogicalPlan::Window(Window {
- window_expr,
- schema,
- ..
- }) => Ok(LogicalPlan::Window(Window {
- input: Arc::new(inputs[0].clone()),
- window_expr: expr[0..window_expr.len()].to_vec(),
- schema: schema.clone(),
- })),
- LogicalPlan::Aggregate(Aggregate {
- group_expr, schema, ..
- }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
- Arc::new(inputs[0].clone()),
- expr[0..group_expr.len()].to_vec(),
- expr[group_expr.len()..].to_vec(),
- schema.clone(),
- )?)),
+ LogicalPlan::Window(Window { window_expr, .. }) => {
+ Ok(LogicalPlan::Window(Window::try_new(
+ expr[0..window_expr.len()].to_vec(),
+ Arc::new(inputs[0].clone()),
+ )?))
+ }
+ LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
+ Ok(LogicalPlan::Aggregate(Aggregate::try_new(
+ Arc::new(inputs[0].clone()),
+ expr[0..group_expr.len()].to_vec(),
+ expr[group_expr.len()..].to_vec(),
+ )?))
+ }
LogicalPlan::Sort(SortPlan { fetch, .. }) =>
Ok(LogicalPlan::Sort(SortPlan {
expr: expr.to_vec(),
input: Arc::new(inputs[0].clone()),