cj-zhukov commented on code in PR #21021:
URL: https://github.com/apache/datafusion/pull/21021#discussion_r3035215097


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -410,21 +426,123 @@ impl DataFrame {
         expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
     ) -> Result<DataFrame> {
         let expr_list: Vec<SelectExpr> =
-            expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
+            expr_list.into_iter().map(|e| e.into()).collect();
 
+        // Extract expressions
         let expressions = expr_list.iter().filter_map(|e| match e {
             SelectExpr::Expression(expr) => Some(expr),
             _ => None,
         });
 
-        let window_func_exprs = find_window_exprs(expressions);
-        let plan = if window_func_exprs.is_empty() {
+        // Apply window functions first
+        let window_func_exprs = find_window_exprs(expressions.clone());
+
+        let mut plan = if window_func_exprs.is_empty() {
             self.plan
         } else {
             LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
         };
 
-        let project_plan = 
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+        // Collect aggregate expressions
+        let aggr_exprs = find_aggregate_exprs(expressions.clone());
+
+        // Check for non-aggregate expressions
+        let has_non_aggregate_expr = expr_list.iter().any(|e| match e {
+            SelectExpr::Expression(expr) => {
+                find_aggregate_exprs(std::iter::once(expr)).is_empty()
+            }
+            SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) => 
true,
+        });
+
+        if has_non_aggregate_expr && !aggr_exprs.is_empty() {
+            return plan_err!(
+                "Column in SELECT must be in GROUP BY or an aggregate function"
+            );
+        }
+
+        // Fallback to projection
+        if matches!(plan, LogicalPlan::Aggregate(_))
+            || has_non_aggregate_expr
+            || aggr_exprs.is_empty()
+        {
+            let project_plan =
+                LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+
+            return Ok(DataFrame {
+                session_state: self.session_state,
+                plan: project_plan,
+                projection_requires_validation: false,
+            });
+        }
+
+        // Unique name generator
+        let make_unique_name =
+            |base: String, used: &mut HashSet<String>, start: usize| {
+                let mut name = base.clone();
+                let mut counter = start;
+                while used.contains(&name) {
+                    name = format!("{base}_{counter}");
+                    counter += 1;
+                }
+                used.insert(name.clone());
+
+                name
+            };
+
+        // Aggregate stage
+        let mut aggr_map: HashMap<Expr, Expr> = HashMap::new();
+        let mut aggr_used_names = HashSet::new();
+        let aggr_exprs_with_alias: Vec<Expr> = aggr_exprs
+            .into_iter()
+            .map(|expr| {
+                let base_name = expr.name_for_alias()?;
+                let name = make_unique_name(base_name, &mut aggr_used_names, 
1);
+                let aliased = expr.clone().alias(name.clone());
+                let col = Expr::Column(Column::from_name(name));
+                aggr_map.insert(expr, col);
+
+                Ok(aliased)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Build aggregate plan
+        plan = LogicalPlanBuilder::from(plan)
+            .aggregate(Vec::<Expr>::new(), aggr_exprs_with_alias)?
+            .build()?;
+
+        // Rewrite expressions
+        let rewrite_expr = |expr: Expr, aggr_map: &HashMap<Expr, Expr>| -> 
Result<Expr> {
+            expr.transform(|e| {
+                Ok(match aggr_map.get(&e) {
+                    Some(replacement) => Transformed::yes(replacement.clone()),
+                    None => Transformed::no(e),
+                })
+            })
+            .map(|t| t.data)
+        };
+
+        // Projection stage
+        let mut rewritten_exprs = Vec::with_capacity(expr_list.len());
+        let mut projection_used_names = HashSet::new();
+        for select_expr in expr_list.into_iter() {
+            match select_expr {
+                SelectExpr::Expression(expr) => {
+                    let base_alias = expr.name_for_alias()?;
+                    let rewritten = rewrite_expr(expr, &aggr_map)?;
+                    let name =
+                        make_unique_name(base_alias, &mut 
projection_used_names, 1);
+                    let final_expr = rewritten.alias(name);

Review Comment:
   Makes sense, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to