alamb commented on code in PR #11469:
URL: https://github.com/apache/datafusion/pull/11469#discussion_r1678290570


##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -556,4 +554,126 @@ physical_plan
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 06)----------UnnestExec
 07)------------ProjectionExec: expr=[column3@0 as 
unnest(recursive_unnest_table.column3), column3@0 as column3]
-08)--------------MemoryExec: partitions=1, partition_sizes=[1]
\ No newline at end of file
+08)--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+
+## group by unnest
+
+### without agg exprs
+query I
+select unnest(column1) c1 from unnest_table group by c1 order by c1;
+----
+1
+2
+3
+4
+5
+6
+12
+
+query II
+select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, 
c2 order by c1, c2;

Review Comment:
   Could you also please add a test with grouping directly in the group by list 
(not a reference to the select list)
   
   Like
   
   ```suggestion
   select unnest(column1), count(*) from unnest_table group by unnest(column1);
   ```



##########
datafusion/sql/src/select.rs:
##########
@@ -354,6 +358,102 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .build()
     }
 
+    fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> 
Result<LogicalPlan> {
+        match &input {
+            LogicalPlan::Aggregate(agg) => {
+                let (new_input, new_group_by_exprs) =
+                    self.try_process_group_by_unnest(agg)?;
+                LogicalPlanBuilder::from(new_input)
+                    .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                    .build()
+            }
+            LogicalPlan::Filter(filter) => match filter.input.as_ref() {
+                LogicalPlan::Aggregate(agg) => {
+                    let (new_input, new_group_by_exprs) =
+                        self.try_process_group_by_unnest(agg)?;
+                    LogicalPlanBuilder::from(new_input)
+                        .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                        .filter(filter.predicate.clone())?
+                        .build()
+                }
+                _ => Ok(input),
+            },
+            _ => Ok(input),
+        }
+    }
+
+    fn try_process_group_by_unnest(
+        &self,
+        agg: &Aggregate,
+    ) -> Result<(LogicalPlan, Vec<Expr>)> {
+        let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
+
+        let input = agg.input.as_ref();
+        let group_by_exprs = &agg.group_expr;
+        let aggr_exprs = &agg.aggr_expr;
+
+        // rewrite group_by_exprs
+        let mut intermediate_plan = input.clone();
+        let mut intermediate_select_exprs = group_by_exprs.to_vec();
+
+        loop {
+            let mut unnest_columns = vec![];
+            let mut inner_projection_exprs = vec![];
+
+            let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
+                .iter()
+                .map(|expr| {
+                    transform_bottom_unnest(
+                        &intermediate_plan,
+                        &mut unnest_columns,
+                        &mut inner_projection_exprs,
+                        expr,
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?
+                .into_iter()
+                .flatten()
+                .collect();
+
+            if unnest_columns.is_empty() {
+                break;
+            } else {
+                let columns = unnest_columns.into_iter().map(|col| 
col.into()).collect();
+                let unnest_options = 
UnnestOptions::new().with_preserve_nulls(false);
+
+                let mut projection_exprs = match &aggr_expr_using_columns {
+                    Some(exprs) => (*exprs).clone(),
+                    None => {
+                        let mut columns = HashSet::new();
+                        for expr in aggr_exprs {
+                            expr.apply(|expr| {
+                                if let Expr::Column(c) = expr {
+                                    columns.insert(Expr::Column(c.clone()));
+                                }
+                                Ok(TreeNodeRecursion::Continue)
+                            })
+                            // As the closure always returns Ok, this "can't" 
error
+                            .expect("Unexpected error");
+                        }
+                        aggr_expr_using_columns = Some(columns.clone());
+                        columns
+                    }
+                };
+                projection_exprs.extend(inner_projection_exprs);
+
+                let plan = LogicalPlanBuilder::from(intermediate_plan.clone())

Review Comment:
   maybe you could avoid the clone with something like
   
   ```suggestion
                   let intermediate_plan = 
LogicalPlanBuilder::from(intermediate_plan,
   ```



##########
datafusion/sql/src/select.rs:
##########
@@ -354,6 +358,102 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .build()
     }
 
+    fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> 
Result<LogicalPlan> {
+        match &input {
+            LogicalPlan::Aggregate(agg) => {
+                let (new_input, new_group_by_exprs) =
+                    self.try_process_group_by_unnest(agg)?;
+                LogicalPlanBuilder::from(new_input)
+                    .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                    .build()
+            }
+            LogicalPlan::Filter(filter) => match filter.input.as_ref() {
+                LogicalPlan::Aggregate(agg) => {
+                    let (new_input, new_group_by_exprs) =
+                        self.try_process_group_by_unnest(agg)?;
+                    LogicalPlanBuilder::from(new_input)
+                        .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                        .filter(filter.predicate.clone())?
+                        .build()
+                }
+                _ => Ok(input),
+            },
+            _ => Ok(input),
+        }
+    }
+
+    fn try_process_group_by_unnest(
+        &self,
+        agg: &Aggregate,
+    ) -> Result<(LogicalPlan, Vec<Expr>)> {
+        let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
+
+        let input = agg.input.as_ref();
+        let group_by_exprs = &agg.group_expr;
+        let aggr_exprs = &agg.aggr_expr;
+
+        // rewrite group_by_exprs

Review Comment:
   I wonder if you could add some documentation comments to this function that 
explains what it does
   
   Ideally such documentation could include an example showing what it is 
necessary to handle group by specially



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to