Jefffrey commented on code in PR #21021:
URL: https://github.com/apache/datafusion/pull/21021#discussion_r3117340415
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -1046,26 +1049,28 @@ async fn test_aggregate_with_union() -> Result<()> {
let df1 = df
.clone()
- // GROUP BY `c1`
- .aggregate(vec![col("c1")], vec![min(col("c2"))])?
- // SELECT `c1` , min(c2) as `result`
- .select(vec![col("c1"), min(col("c2")).alias("result")])?;
+ // GROUP BY c1, compute min(c2) as result
+ .aggregate(vec![col("c1")], vec![min(col("c2")).alias("result")])?
Review Comment:
Do these existing tests need to be changed because the new functionality
interferes with them? Ideally we don't want any regression for this new
(somewhat niche?) feature
##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -410,21 +426,128 @@ impl DataFrame {
expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
) -> Result<DataFrame> {
let expr_list: Vec<SelectExpr> =
- expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
+ expr_list.into_iter().map(|e| e.into()).collect();
+ // Extract expressions
let expressions = expr_list.iter().filter_map(|e| match e {
SelectExpr::Expression(expr) => Some(expr),
_ => None,
});
- let window_func_exprs = find_window_exprs(expressions);
- let plan = if window_func_exprs.is_empty() {
+ // Apply window functions first
+ let window_func_exprs = find_window_exprs(expressions.clone());
+
+ let mut plan = if window_func_exprs.is_empty() {
self.plan
} else {
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
};
- let project_plan =
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+ // Collect aggregate expressions
+ let aggr_exprs = find_aggregate_exprs(expressions.clone());
+
+ // Check for non-aggregate expressions
+ let has_non_aggregate_expr = expr_list.iter().any(|e| match e {
+ SelectExpr::Expression(expr) => {
+ find_aggregate_exprs(std::iter::once(expr)).is_empty()
+ }
+ SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) =>
true,
+ });
+
+ if has_non_aggregate_expr && !aggr_exprs.is_empty() {
+ return plan_err!(
+ "Column in SELECT must be in GROUP BY or an aggregate function"
+ );
+ }
+
+ // Fallback to projection
+ if matches!(plan, LogicalPlan::Aggregate(_))
+ || has_non_aggregate_expr
+ || aggr_exprs.is_empty()
+ {
+ let project_plan =
+ LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+
+ return Ok(DataFrame {
+ session_state: self.session_state,
+ plan: project_plan,
+ projection_requires_validation: false,
+ });
+ }
+
+ // Unique name generator
Review Comment:
There's quite a bit of logic introduced here, I'm trying to wrap my head
around. Is my understanding correct that we're introducing an aggregate node,
naming them unique, then applying a projection on top based on these unique
generated names?
Is it feasible to simplify this to pass all the aggregate exprs directly
into creating an aggregate node, then applying a projection of `*` on top? Or
does this miss an edge case?
For example:
```
df.select(count(1))
Project *
Aggregate count(1)
```
##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -410,21 +426,128 @@ impl DataFrame {
expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
) -> Result<DataFrame> {
let expr_list: Vec<SelectExpr> =
- expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
+ expr_list.into_iter().map(|e| e.into()).collect();
+ // Extract expressions
let expressions = expr_list.iter().filter_map(|e| match e {
SelectExpr::Expression(expr) => Some(expr),
_ => None,
});
- let window_func_exprs = find_window_exprs(expressions);
- let plan = if window_func_exprs.is_empty() {
+ // Apply window functions first
+ let window_func_exprs = find_window_exprs(expressions.clone());
+
+ let mut plan = if window_func_exprs.is_empty() {
self.plan
} else {
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
};
- let project_plan =
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+ // Collect aggregate expressions
+ let aggr_exprs = find_aggregate_exprs(expressions.clone());
+
+ // Check for non-aggregate expressions
+ let has_non_aggregate_expr = expr_list.iter().any(|e| match e {
+ SelectExpr::Expression(expr) => {
+ find_aggregate_exprs(std::iter::once(expr)).is_empty()
+ }
+ SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) =>
true,
+ });
+
+ if has_non_aggregate_expr && !aggr_exprs.is_empty() {
+ return plan_err!(
+ "Column in SELECT must be in GROUP BY or an aggregate function"
+ );
+ }
+
+ // Fallback to projection
+ if matches!(plan, LogicalPlan::Aggregate(_))
+ || has_non_aggregate_expr
+ || aggr_exprs.is_empty()
+ {
Review Comment:
These checks I find a little confusing; what is the significance of the plan
already being an aggregate therefore we fallback to projection? And does
checking `has_non_aggregate_expr` matter here when all we really care about is
the presence of aggregate expressions (`aggr_exprs.is_empty()` here)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]