This is an automated email from the ASF dual-hosted git repository.

jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a979f3e5d4 feat: support `unnest` in GROUP BY clause (#11469)
a979f3e5d4 is described below

commit a979f3e5d4745edf31a489185e6dda5008e6e628
Author: JasonLi <[email protected]>
AuthorDate: Wed Jul 17 09:32:36 2024 +0800

    feat: support `unnest` in GROUP BY clause (#11469)
    
    * feat: support group by unnest
    
    * pass slt
    
    * refactor: mv process_group_by_unnest into try_process_unnest
    
    * chore: add some documentation comments and tests
    
    * Avoid cloning input
    
    * use consistent field names
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/sql/src/select.rs                  | 118 ++++++++++++++++++++++-
 datafusion/sqllogictest/test_files/unnest.slt | 134 +++++++++++++++++++++++++-
 2 files changed, 249 insertions(+), 3 deletions(-)

diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index a5891e655a..84b80c3112 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -26,18 +26,20 @@ use crate::utils::{
     resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
 };
 
+use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
 use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
 use datafusion_common::{Column, UnnestOptions};
 use datafusion_expr::expr::Alias;
 use datafusion_expr::expr_rewriter::{
     normalize_col, normalize_col_with_schemas_and_ambiguity_check, 
normalize_cols,
 };
+use datafusion_expr::logical_plan::tree_node::unwrap_arc;
 use datafusion_expr::utils::{
     expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, 
expr_to_columns,
     find_aggregate_exprs, find_window_exprs,
 };
 use datafusion_expr::{
-    Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
+    Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, 
Partitioning,
 };
 use sqlparser::ast::{
     Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
@@ -297,6 +299,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         input: LogicalPlan,
         select_exprs: Vec<Expr>,
     ) -> Result<LogicalPlan> {
+        // Try process group by unnest
+        let input = self.try_process_aggregate_unnest(input)?;
+
         let mut intermediate_plan = input;
         let mut intermediate_select_exprs = select_exprs;
         // Each expr in select_exprs can contains multiple unnest stage
@@ -354,6 +359,117 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .build()
     }
 
+    fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> 
Result<LogicalPlan> {
+        match input {
+            LogicalPlan::Aggregate(agg) => {
+                let agg_expr = agg.aggr_expr.clone();
+                let (new_input, new_group_by_exprs) =
+                    self.try_process_group_by_unnest(agg)?;
+                LogicalPlanBuilder::from(new_input)
+                    .aggregate(new_group_by_exprs, agg_expr)?
+                    .build()
+            }
+            LogicalPlan::Filter(mut filter) => {
+                filter.input = Arc::new(
+                    
self.try_process_aggregate_unnest(unwrap_arc(filter.input))?,
+                );
+                Ok(LogicalPlan::Filter(filter))
+            }
+            _ => Ok(input),
+        }
+    }
+
+    /// Try converting Unnest(Expr) of group by to Unnest/Projection
+    /// Return the new input and group_by_exprs of Aggregate.
+    fn try_process_group_by_unnest(
+        &self,
+        agg: Aggregate,
+    ) -> Result<(LogicalPlan, Vec<Expr>)> {
+        let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
+
+        let Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            ..
+        } = agg;
+
+        // process unnest of group_by_exprs, and input of agg will be rewritten
+        // for example:
+        //
+        // ```
+        // Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { 
table: "tab" }), name: "array_col" }))]], aggr=[[]]
+        //   TableScan: tab
+        // ```
+        //
+        // will be transformed into
+        //
+        // ```
+        // Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]]
+        //   Unnest: lists[unnest(tab.array_col)] structs[]
+        //     Projection: tab.array_col AS unnest(tab.array_col)
+        //       TableScan: tab
+        // ```
+        let mut intermediate_plan = unwrap_arc(input);
+        let mut intermediate_select_exprs = group_expr;
+
+        loop {
+            let mut unnest_columns = vec![];
+            let mut inner_projection_exprs = vec![];
+
+            let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
+                .iter()
+                .map(|expr| {
+                    transform_bottom_unnest(
+                        &intermediate_plan,
+                        &mut unnest_columns,
+                        &mut inner_projection_exprs,
+                        expr,
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?
+                .into_iter()
+                .flatten()
+                .collect();
+
+            if unnest_columns.is_empty() {
+                break;
+            } else {
+                let columns = unnest_columns.into_iter().map(|col| 
col.into()).collect();
+                let unnest_options = 
UnnestOptions::new().with_preserve_nulls(false);
+
+                let mut projection_exprs = match &aggr_expr_using_columns {
+                    Some(exprs) => (*exprs).clone(),
+                    None => {
+                        let mut columns = HashSet::new();
+                        for expr in &aggr_expr {
+                            expr.apply(|expr| {
+                                if let Expr::Column(c) = expr {
+                                    columns.insert(Expr::Column(c.clone()));
+                                }
+                                Ok(TreeNodeRecursion::Continue)
+                            })
+                            // As the closure always returns Ok, this "can't" 
error
+                            .expect("Unexpected error");
+                        }
+                        aggr_expr_using_columns = Some(columns.clone());
+                        columns
+                    }
+                };
+                projection_exprs.extend(inner_projection_exprs);
+
+                intermediate_plan = LogicalPlanBuilder::from(intermediate_plan)
+                    .project(projection_exprs)?
+                    .unnest_columns_with_options(columns, unnest_options)?
+                    .build()?;
+
+                intermediate_select_exprs = outer_projection_exprs;
+            }
+        }
+
+        Ok((intermediate_plan, intermediate_select_exprs))
+    }
+
     fn plan_selection(
         &self,
         selection: Option<SQLExpr>,
diff --git a/datafusion/sqllogictest/test_files/unnest.slt 
b/datafusion/sqllogictest/test_files/unnest.slt
index 698faf87c9..93146541e1 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -500,8 +500,6 @@ select unnest(column1) from (select * from 
(values([1,2,3]), ([4,5,6])) limit 1
 query error DataFusion error: Error during planning: Projections require 
unique expression names but the expression "UNNEST\(Column\(Column \{ relation: 
Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 
and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" 
\}\), name: "column1" \}\)\)" at position 1 have the same name. Consider 
aliasing \("AS"\) one of them.
 select unnest(column1), unnest(column1) from unnest_table;
 
-statement ok
-drop table unnest_table;
 
 ## unnest list followed by unnest struct
 query ???
@@ -557,3 +555,135 @@ physical_plan
 06)----------UnnestExec
 07)------------ProjectionExec: expr=[column3@0 as 
unnest(recursive_unnest_table.column3), column3@0 as column3]
 08)--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+## group by unnest
+
+### without agg exprs
+query I
+select unnest(column1) c1 from unnest_table group by c1 order by c1;
+----
+1
+2
+3
+4
+5
+6
+12
+
+query II
+select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, 
c2 order by c1, c2;
+----
+1 7
+2 NULL
+3 NULL
+4 8
+5 9
+6 11
+12 NULL
+NULL 10
+NULL 12
+NULL 42
+NULL NULL
+
+query III
+select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table 
group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1
+2 NULL 1
+3 NULL 1
+4 8 2
+5 9 2
+6 11 3
+12 NULL NULL
+NULL 10 2
+NULL 12 3
+NULL 42 NULL
+NULL NULL NULL
+
+### with agg exprs
+
+query IIII
+select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from 
unnest_table group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1 1
+2 NULL 1 1
+3 NULL 1 1
+4 8 2 1
+5 9 2 1
+6 11 3 1
+12 NULL NULL 1
+NULL 10 2 1
+NULL 12 3 1
+NULL 42 NULL 1
+NULL NULL NULL 1
+
+query IIII
+select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from 
unnest_table group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1 1
+2 NULL 1 1
+3 NULL 1 1
+4 8 2 1
+5 9 2 1
+6 11 3 0
+12 NULL NULL 0
+NULL 10 2 1
+NULL 12 3 0
+NULL 42 NULL 0
+NULL NULL NULL 0
+
+query IIIII
+select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), 
sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1 1 1
+2 NULL 1 1 1
+3 NULL 1 1 1
+4 8 2 1 2
+5 9 2 1 2
+6 11 3 0 3
+12 NULL NULL 0 NULL
+NULL 10 2 1 2
+NULL 12 3 0 3
+NULL 42 NULL 0 NULL
+NULL NULL NULL 0 NULL
+
+query II
+select unnest(column1), count(*) from unnest_table group by unnest(column1) 
order by unnest(column1) desc;
+----
+12 1
+6 1
+5 1
+4 1
+3 1
+2 1
+1 1
+
+### group by recursive unnest list
+
+query ?
+select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 
order by c2;
+----
+[1]
+[1, 1]
+[2]
+[3, 4]
+[5]
+[7, 8]
+[, 6]
+NULL
+
+query ?I
+select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table 
group by c2 order by c2;
+----
+[1] 1
+[1, 1] 1
+[2] 1
+[3, 4] 1
+[5] 1
+[7, 8] 1
+[, 6] 1
+NULL 1
+
+### TODO: group by unnest struct
+query error DataFusion error: Error during planning: Projection references 
non\-aggregate values
+select unnest(column1) c1 from nested_unnest_table group by c1.c0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to