alamb commented on code in PR #11469: URL: https://github.com/apache/datafusion/pull/11469#discussion_r1678290570
########## datafusion/sqllogictest/test_files/unnest.slt: ########## @@ -556,4 +554,126 @@ physical_plan 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------UnnestExec 07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3] -08)--------------MemoryExec: partitions=1, partition_sizes=[1] \ No newline at end of file +08)--------------MemoryExec: partitions=1, partition_sizes=[1] + + +## group by unnest + +### without agg exprs +query I +select unnest(column1) c1 from unnest_table group by c1 order by c1; +---- +1 +2 +3 +4 +5 +6 +12 + +query II +select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2; Review Comment: Could you also please add a test with grouping directly in the group by list (not a reference to the select list) Like ```suggestion select unnest(column1), count(*) from unnest_table group by unnest(column1); ``` ########## datafusion/sql/src/select.rs: ########## @@ -354,6 +358,102 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .build() } + fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> { + match &input { + LogicalPlan::Aggregate(agg) => { + let (new_input, new_group_by_exprs) = + self.try_process_group_by_unnest(agg)?; + LogicalPlanBuilder::from(new_input) + .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? + .build() + } + LogicalPlan::Filter(filter) => match filter.input.as_ref() { + LogicalPlan::Aggregate(agg) => { + let (new_input, new_group_by_exprs) = + self.try_process_group_by_unnest(agg)?; + LogicalPlanBuilder::from(new_input) + .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? + .filter(filter.predicate.clone())? + .build() + } + _ => Ok(input), + }, + _ => Ok(input), + } + } + + fn try_process_group_by_unnest( + &self, + agg: &Aggregate, + ) -> Result<(LogicalPlan, Vec<Expr>)> { + let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None; + + let input = agg.input.as_ref(); + let group_by_exprs = &agg.group_expr; + let aggr_exprs = &agg.aggr_expr; + + // rewrite group_by_exprs + let mut intermediate_plan = input.clone(); + let mut intermediate_select_exprs = group_by_exprs.to_vec(); + + loop { + let mut unnest_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs + .iter() + .map(|expr| { + transform_bottom_unnest( + &intermediate_plan, + &mut unnest_columns, + &mut inner_projection_exprs, + expr, + ) + }) + .collect::<Result<Vec<_>>>()? + .into_iter() + .flatten() + .collect(); + + if unnest_columns.is_empty() { + break; + } else { + let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); + let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + + let mut projection_exprs = match &aggr_expr_using_columns { + Some(exprs) => (*exprs).clone(), + None => { + let mut columns = HashSet::new(); + for expr in aggr_exprs { + expr.apply(|expr| { + if let Expr::Column(c) = expr { + columns.insert(Expr::Column(c.clone())); + } + Ok(TreeNodeRecursion::Continue) + }) + // As the closure always returns Ok, this "can't" error + .expect("Unexpected error"); + } + aggr_expr_using_columns = Some(columns.clone()); + columns + } + }; + projection_exprs.extend(inner_projection_exprs); + + let plan = LogicalPlanBuilder::from(intermediate_plan.clone()) Review Comment: maybe you could avoid the clone with something like ```suggestion let intermediate_plan = LogicalPlanBuilder::from(intermediate_plan, ``` ########## datafusion/sql/src/select.rs: ########## @@ -354,6 +358,102 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .build() } + fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> { + match &input { + LogicalPlan::Aggregate(agg) => { + let (new_input, new_group_by_exprs) = + self.try_process_group_by_unnest(agg)?; + LogicalPlanBuilder::from(new_input) + .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? + .build() + } + LogicalPlan::Filter(filter) => match filter.input.as_ref() { + LogicalPlan::Aggregate(agg) => { + let (new_input, new_group_by_exprs) = + self.try_process_group_by_unnest(agg)?; + LogicalPlanBuilder::from(new_input) + .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? + .filter(filter.predicate.clone())? + .build() + } + _ => Ok(input), + }, + _ => Ok(input), + } + } + + fn try_process_group_by_unnest( + &self, + agg: &Aggregate, + ) -> Result<(LogicalPlan, Vec<Expr>)> { + let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None; + + let input = agg.input.as_ref(); + let group_by_exprs = &agg.group_expr; + let aggr_exprs = &agg.aggr_expr; + + // rewrite group_by_exprs Review Comment: I wonder if you could add some documentation comments to this function that explains what it does Ideally such documentation could include an example showing what it is necessary to handle group by specially -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org