This is an automated email from the ASF dual-hosted git repository.
jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a979f3e5d4 feat: support `unnest` in GROUP BY clause (#11469)
a979f3e5d4 is described below
commit a979f3e5d4745edf31a489185e6dda5008e6e628
Author: JasonLi <[email protected]>
AuthorDate: Wed Jul 17 09:32:36 2024 +0800
feat: support `unnest` in GROUP BY clause (#11469)
* feat: support group by unnest
* pass slt
* refactor: mv process_group_by_unnest into try_process_unnest
* chore: add some documentation comments and tests
* Avoid cloning input
* use consistent field names
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/sql/src/select.rs | 118 ++++++++++++++++++++++-
datafusion/sqllogictest/test_files/unnest.slt | 134 +++++++++++++++++++++++++-
2 files changed, 249 insertions(+), 3 deletions(-)
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index a5891e655a..84b80c3112 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -26,18 +26,20 @@ use crate::utils::{
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
};
+use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::Alias;
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
normalize_cols,
};
+use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr,
expr_to_columns,
find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
- Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
+ Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
@@ -297,6 +299,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: LogicalPlan,
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
+ // Try process group by unnest
+ let input = self.try_process_aggregate_unnest(input)?;
+
let mut intermediate_plan = input;
let mut intermediate_select_exprs = select_exprs;
// Each expr in select_exprs can contains multiple unnest stage
@@ -354,6 +359,117 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.build()
}
+ fn try_process_aggregate_unnest(&self, input: LogicalPlan) ->
Result<LogicalPlan> {
+ match input {
+ LogicalPlan::Aggregate(agg) => {
+ let agg_expr = agg.aggr_expr.clone();
+ let (new_input, new_group_by_exprs) =
+ self.try_process_group_by_unnest(agg)?;
+ LogicalPlanBuilder::from(new_input)
+ .aggregate(new_group_by_exprs, agg_expr)?
+ .build()
+ }
+ LogicalPlan::Filter(mut filter) => {
+ filter.input = Arc::new(
+
self.try_process_aggregate_unnest(unwrap_arc(filter.input))?,
+ );
+ Ok(LogicalPlan::Filter(filter))
+ }
+ _ => Ok(input),
+ }
+ }
+
+ /// Try converting Unnest(Expr) of group by to Unnest/Projection
+ /// Return the new input and group_by_exprs of Aggregate.
+ fn try_process_group_by_unnest(
+ &self,
+ agg: Aggregate,
+ ) -> Result<(LogicalPlan, Vec<Expr>)> {
+ let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
+
+ let Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ ..
+ } = agg;
+
+ // process unnest of group_by_exprs, and input of agg will be rewritten
+ // for example:
+ //
+ // ```
+ // Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare {
table: "tab" }), name: "array_col" }))]], aggr=[[]]
+ // TableScan: tab
+ // ```
+ //
+ // will be transformed into
+ //
+ // ```
+ // Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]]
+ // Unnest: lists[unnest(tab.array_col)] structs[]
+ // Projection: tab.array_col AS unnest(tab.array_col)
+ // TableScan: tab
+ // ```
+ let mut intermediate_plan = unwrap_arc(input);
+ let mut intermediate_select_exprs = group_expr;
+
+ loop {
+ let mut unnest_columns = vec![];
+ let mut inner_projection_exprs = vec![];
+
+ let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
+ .iter()
+ .map(|expr| {
+ transform_bottom_unnest(
+ &intermediate_plan,
+ &mut unnest_columns,
+ &mut inner_projection_exprs,
+ expr,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect();
+
+ if unnest_columns.is_empty() {
+ break;
+ } else {
+ let columns = unnest_columns.into_iter().map(|col|
col.into()).collect();
+ let unnest_options =
UnnestOptions::new().with_preserve_nulls(false);
+
+ let mut projection_exprs = match &aggr_expr_using_columns {
+ Some(exprs) => (*exprs).clone(),
+ None => {
+ let mut columns = HashSet::new();
+ for expr in &aggr_expr {
+ expr.apply(|expr| {
+ if let Expr::Column(c) = expr {
+ columns.insert(Expr::Column(c.clone()));
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ // As the closure always returns Ok, this "can't"
error
+ .expect("Unexpected error");
+ }
+ aggr_expr_using_columns = Some(columns.clone());
+ columns
+ }
+ };
+ projection_exprs.extend(inner_projection_exprs);
+
+ intermediate_plan = LogicalPlanBuilder::from(intermediate_plan)
+ .project(projection_exprs)?
+ .unnest_columns_with_options(columns, unnest_options)?
+ .build()?;
+
+ intermediate_select_exprs = outer_projection_exprs;
+ }
+ }
+
+ Ok((intermediate_plan, intermediate_select_exprs))
+ }
+
fn plan_selection(
&self,
selection: Option<SQLExpr>,
diff --git a/datafusion/sqllogictest/test_files/unnest.slt
b/datafusion/sqllogictest/test_files/unnest.slt
index 698faf87c9..93146541e1 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -500,8 +500,6 @@ select unnest(column1) from (select * from
(values([1,2,3]), ([4,5,6])) limit 1
query error DataFusion error: Error during planning: Projections require
unique expression names but the expression "UNNEST\(Column\(Column \{ relation:
Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0
and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table"
\}\), name: "column1" \}\)\)" at position 1 have the same name. Consider
aliasing \("AS"\) one of them.
select unnest(column1), unnest(column1) from unnest_table;
-statement ok
-drop table unnest_table;
## unnest list followed by unnest struct
query ???
@@ -557,3 +555,135 @@ physical_plan
06)----------UnnestExec
07)------------ProjectionExec: expr=[column3@0 as
unnest(recursive_unnest_table.column3), column3@0 as column3]
08)--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+## group by unnest
+
+### without agg exprs
+query I
+select unnest(column1) c1 from unnest_table group by c1 order by c1;
+----
+1
+2
+3
+4
+5
+6
+12
+
+query II
+select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1,
c2 order by c1, c2;
+----
+1 7
+2 NULL
+3 NULL
+4 8
+5 9
+6 11
+12 NULL
+NULL 10
+NULL 12
+NULL 42
+NULL NULL
+
+query III
+select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table
group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1
+2 NULL 1
+3 NULL 1
+4 8 2
+5 9 2
+6 11 3
+12 NULL NULL
+NULL 10 2
+NULL 12 3
+NULL 42 NULL
+NULL NULL NULL
+
+### with agg exprs
+
+query IIII
+select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from
unnest_table group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1 1
+2 NULL 1 1
+3 NULL 1 1
+4 8 2 1
+5 9 2 1
+6 11 3 1
+12 NULL NULL 1
+NULL 10 2 1
+NULL 12 3 1
+NULL 42 NULL 1
+NULL NULL NULL 1
+
+query IIII
+select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from
unnest_table group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1 1
+2 NULL 1 1
+3 NULL 1 1
+4 8 2 1
+5 9 2 1
+6 11 3 0
+12 NULL NULL 0
+NULL 10 2 1
+NULL 12 3 0
+NULL 42 NULL 0
+NULL NULL NULL 0
+
+query IIIII
+select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4),
sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
+----
+1 7 1 1 1
+2 NULL 1 1 1
+3 NULL 1 1 1
+4 8 2 1 2
+5 9 2 1 2
+6 11 3 0 3
+12 NULL NULL 0 NULL
+NULL 10 2 1 2
+NULL 12 3 0 3
+NULL 42 NULL 0 NULL
+NULL NULL NULL 0 NULL
+
+query II
+select unnest(column1), count(*) from unnest_table group by unnest(column1)
order by unnest(column1) desc;
+----
+12 1
+6 1
+5 1
+4 1
+3 1
+2 1
+1 1
+
+### group by recursive unnest list
+
+query ?
+select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2
order by c2;
+----
+[1]
+[1, 1]
+[2]
+[3, 4]
+[5]
+[7, 8]
+[, 6]
+NULL
+
+query ?I
+select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table
group by c2 order by c2;
+----
+[1] 1
+[1, 1] 1
+[2] 1
+[3, 4] 1
+[5] 1
+[7, 8] 1
+[, 6] 1
+NULL 1
+
+### TODO: group by unnest struct
+query error DataFusion error: Error during planning: Projection references
non\-aggregate values
+select unnest(column1) c1 from nested_unnest_table group by c1.c0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]