alamb commented on code in PR #10429: URL: https://github.com/apache/datafusion/pull/10429#discussion_r1608917787
########## datafusion/sqllogictest/test_files/unnest.slt: ########## @@ -288,6 +308,18 @@ select unnest(array_remove(column1, 12)) from unnest_table; 5 6 +## unnest struct-typed column and list-typed column at the same time +query I?II? +select unnest(column1), column1, unnest(column5), column5 from unnest_table; +---- +1 [1, 2, 3] 1 2 {c0: 1, c1: 2} +2 [1, 2, 3] 1 2 {c0: 1, c1: 2} +3 [1, 2, 3] 1 2 {c0: 1, c1: 2} +4 [4, 5] 3 4 {c0: 3, c1: 4} +5 [4, 5] 3 4 {c0: 3, c1: 4} +6 [6] NULL NULL NULL +12 [12] 7 8 {c0: 7, c1: 8} + Review Comment: Can we please add some other tests: 1. Test the output type (e.g. `select arrow_typeof(unnest(column1))`) 2. Test nested structs (I tested it works well, but I think we should have some coverage) Nested structs ```sql > create or replace table t as values (struct('a', 'b', struct('c'))), (struct('d', 'e', struct('f'))); 0 row(s) fetched. Elapsed 0.031 seconds. > select * from t; +-----------------------------+ | column1 | +-----------------------------+ | {c0: a, c1: b, c2: {c0: c}} | | {c0: d, c1: e, c2: {c0: f}} | +-----------------------------+ 2 row(s) fetched. Elapsed 0.006 seconds. > select unnest(column1) from t; +----------------------+----------------------+----------------------+ | unnest(t.column1).c0 | unnest(t.column1).c1 | unnest(t.column1).c2 | +----------------------+----------------------+----------------------+ | a | b | {c0: c} | | d | e | {c0: f} | +----------------------+----------------------+----------------------+ 2 row(s) fetched. Elapsed 0.013 seconds. ``` And ```sql > create or replace table t as values (struct('a', 'b', [1,2,3])), (struct('x', 'y', [10,20])); 0 row(s) fetched. Elapsed 0.010 seconds. > select unnest(column1) from t; +----------------------+----------------------+----------------------+ | unnest(t.column1).c0 | unnest(t.column1).c1 | unnest(t.column1).c2 | +----------------------+----------------------+----------------------+ | a | b | [1, 2, 3] | | x | y | [10, 20] | +----------------------+----------------------+----------------------+ 2 row(s) fetched. Elapsed 0.008 seconds. ``` ########## datafusion/expr/src/expr_schema.rs: ########## @@ -123,7 +123,8 @@ impl ExprSchemable for Expr { Ok(field.data_type().clone()) } DataType::Struct(_) => { - not_impl_err!("unnest() does not support struct yet") + // TODO: this is not correct, because unnest(struct) wll result into multiple data_type Review Comment: When do we end up with an `unest` **Expr** (vs an `LogicalPlan::Unnest`). Could we simply return `not_yet_implemented` error here? ########## datafusion/sqllogictest/test_files/unnest.slt: ########## @@ -288,6 +308,18 @@ select unnest(array_remove(column1, 12)) from unnest_table; 5 6 +## unnest struct-typed column and list-typed column at the same time +query I?II? +select unnest(column1), column1, unnest(column5), column5 from unnest_table; +---- +1 [1, 2, 3] 1 2 {c0: 1, c1: 2} +2 [1, 2, 3] 1 2 {c0: 1, c1: 2} +3 [1, 2, 3] 1 2 {c0: 1, c1: 2} +4 [4, 5] 3 4 {c0: 3, c1: 4} +5 [4, 5] 3 4 {c0: 3, c1: 4} +6 [6] NULL NULL NULL +12 [12] 7 8 {c0: 7, c1: 8} + Review Comment: Interestingly the output type doesn't seem to work (which probably makes sense as unnest needs to be converted to a LogicalPlan). I don't think we have to fix this in the PR (but adding coverage would be good) ```sql > select unnest(column1) from t; +----------------------+----------------------+----------------------+ | unnest(t.column1).c0 | unnest(t.column1).c1 | unnest(t.column1).c2 | +----------------------+----------------------+----------------------+ | a | b | [1, 2, 3] | | x | y | [10, 20] | +----------------------+----------------------+----------------------+ 2 row(s) fetched. > select arrow_typeof(unnest(column1)) from t; Internal error: unnest on struct can ony be applied at the root level of select expression. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker ``` ########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -1592,7 +1592,47 @@ impl TableSource for LogicalTableSource { /// Create a [`LogicalPlan::Unnest`] plan pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> { - unnest_with_options(input, columns, UnnestOptions::new()) + unnest_with_options(input, columns, UnnestOptions::default()) +} + +pub fn get_unnested_columns( Review Comment: Could you please add some documentation explaining what this function does? Also, I wonder if it needs to be `pub` or if we could leave it `pub(crate)` or not pub? ########## datafusion/sql/src/utils.rs: ########## @@ -255,3 +256,188 @@ pub(crate) fn normalize_ident(id: Ident) -> String { None => id.value.to_ascii_lowercase(), } } + +/// The context is we want to rewrite unnest() into InnerProjection->Unnest->OuterProjection +/// Given an expression which contains unnest expr as one of its children, +/// Try transform depends on unnest type +/// - For list column: unnest(col) with type list -> unnest(col) with type list::item +/// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2 +/// The transformed exprs will be used in the outer projection +pub(crate) fn recursive_transform_unnest( + input: &LogicalPlan, + unnest_placeholder_columns: &mut Vec<String>, + inner_projection_exprs: &mut Vec<Expr>, + original_expr: Expr, +) -> Result<Vec<Expr>> { + let mut transform = + |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> { + // Full context, we are trying to plan the execution as InnerProjection->Unnest->OuterProjection + // inside unnest execution, each column inside the inner projection + // will be transformed into new columns. Thus we need to keep track of these placeholding column names + let placeholder_name = unnest_expr.display_name()?; + + unnest_placeholder_columns.push(placeholder_name.clone()); + // Add alias for the argument expression, to avoid naming conflicts + // with other expressions in the select list. For example: `select unnest(col1), col1 from t`. + // this extra projection is used to unnest transforming + inner_projection_exprs + .push(expr_in_unnest.clone().alias(placeholder_name.clone())); + let schema = input.schema(); + + let (data_type, _) = expr_in_unnest.data_type_and_nullable(schema)?; + + let outer_projection_columns = + get_unnested_columns(&placeholder_name, &data_type)?; + let expr = outer_projection_columns + .iter() + .map(|col| Expr::Column(col.0.clone())) + .collect::<Vec<_>>(); + Ok(expr) + }; + // expr transformed maybe either the same, or different from the originals exprs + // for example: + // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 + // - unnest(array_col) will be transformed into unnest(array_col) + // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1 + + // Specifically handle root level unnest expr, this is the only place + // unnest on struct can be handled + if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr { + return transform(&original_expr, arg); + } + let Transformed { + data: transformed_expr, + transformed, + tnr: _, + } = original_expr.transform_up(|expr: Expr| { + if let Expr::Unnest(Unnest { expr: ref arg }) = expr { + let (data_type, _) = expr.data_type_and_nullable(input.schema())?; + if let DataType::Struct(_) = data_type { + return internal_err!("unnest on struct can ony be applied at the root level of select expression"); + } + let transformed_exprs = transform(&expr, arg)?; + Ok(Transformed::yes(transformed_exprs[0].clone())) + } else { + Ok(Transformed::no(expr)) + } + })?; + + if !transformed { + if matches!(&transformed_expr, Expr::Column(_)) { + inner_projection_exprs.push(transformed_expr.clone()); + Ok(vec![transformed_expr]) + } else { + // We need to evaluate the expr in the inner projection, + // outer projection just select its name + let column_name = transformed_expr.display_name()?; + inner_projection_exprs.push(transformed_expr); + Ok(vec![Expr::Column(Column::from_name(column_name))]) + } + } else { + Ok(vec![transformed_expr]) + } +} + +// write test for recursive_transform_unnest +#[cfg(test)] +mod tests { + use std::{ops::Add, sync::Arc}; + + use super::*; + use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + use arrow_schema::Fields; + use datafusion_common::DFSchema; + use datafusion_expr::EmptyRelation; + + #[test] + fn test_recursive_transform_unnest() -> Result<()> { + let schema = Schema::new(vec![ + Field::new( + "struct_col", + ArrowDataType::Struct(Fields::from(vec![ + Field::new("field1", ArrowDataType::Int32, false), + Field::new("field2", ArrowDataType::Int32, false), + ])), + false, + ), + Field::new( + "array_col", + ArrowDataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + true, + ), + Field::new("int_col", ArrowDataType::Int32, false), + ]); + + let dfschema = DFSchema::try_from(schema)?; + + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(dfschema), + }); + + let mut unnest_placeholder_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + // unnest(struct_col) + let original_expr = Expr::Unnest(Unnest { + expr: Box::new(Expr::Column(Column::from_name("struct_col"))), Review Comment: I think you can make this a lot more concise using the [expr_fn](https://docs.rs/datafusion/latest/datafusion/logical_expr/expr_fn/index.html) API, like: ```suggestion expr: Box::new(col("struct_col")) ``` In fact maybe we should add an expr_fn for unnest ```rust /// Call `unnest` on the expression fn unnest(arg: Expr) -> Expr { Expr::Unnest(Unnest { expr:: Box::new(arg) } ) } ``` ########## datafusion/sql/src/utils.rs: ########## @@ -255,3 +256,188 @@ pub(crate) fn normalize_ident(id: Ident) -> String { None => id.value.to_ascii_lowercase(), } } + +/// The context is we want to rewrite unnest() into InnerProjection->Unnest->OuterProjection +/// Given an expression which contains unnest expr as one of its children, +/// Try transform depends on unnest type +/// - For list column: unnest(col) with type list -> unnest(col) with type list::item +/// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2 +/// The transformed exprs will be used in the outer projection +pub(crate) fn recursive_transform_unnest( + input: &LogicalPlan, + unnest_placeholder_columns: &mut Vec<String>, + inner_projection_exprs: &mut Vec<Expr>, + original_expr: Expr, +) -> Result<Vec<Expr>> { + let mut transform = + |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> { + // Full context, we are trying to plan the execution as InnerProjection->Unnest->OuterProjection + // inside unnest execution, each column inside the inner projection + // will be transformed into new columns. Thus we need to keep track of these placeholding column names + let placeholder_name = unnest_expr.display_name()?; + + unnest_placeholder_columns.push(placeholder_name.clone()); + // Add alias for the argument expression, to avoid naming conflicts + // with other expressions in the select list. For example: `select unnest(col1), col1 from t`. + // this extra projection is used to unnest transforming + inner_projection_exprs + .push(expr_in_unnest.clone().alias(placeholder_name.clone())); + let schema = input.schema(); + + let (data_type, _) = expr_in_unnest.data_type_and_nullable(schema)?; + + let outer_projection_columns = + get_unnested_columns(&placeholder_name, &data_type)?; + let expr = outer_projection_columns + .iter() + .map(|col| Expr::Column(col.0.clone())) + .collect::<Vec<_>>(); + Ok(expr) + }; + // expr transformed maybe either the same, or different from the originals exprs + // for example: + // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 + // - unnest(array_col) will be transformed into unnest(array_col) + // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1 + + // Specifically handle root level unnest expr, this is the only place + // unnest on struct can be handled + if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr { + return transform(&original_expr, arg); + } + let Transformed { + data: transformed_expr, + transformed, + tnr: _, + } = original_expr.transform_up(|expr: Expr| { + if let Expr::Unnest(Unnest { expr: ref arg }) = expr { + let (data_type, _) = expr.data_type_and_nullable(input.schema())?; + if let DataType::Struct(_) = data_type { + return internal_err!("unnest on struct can ony be applied at the root level of select expression"); + } + let transformed_exprs = transform(&expr, arg)?; + Ok(Transformed::yes(transformed_exprs[0].clone())) + } else { + Ok(Transformed::no(expr)) + } + })?; + + if !transformed { + if matches!(&transformed_expr, Expr::Column(_)) { + inner_projection_exprs.push(transformed_expr.clone()); + Ok(vec![transformed_expr]) + } else { + // We need to evaluate the expr in the inner projection, + // outer projection just select its name + let column_name = transformed_expr.display_name()?; + inner_projection_exprs.push(transformed_expr); + Ok(vec![Expr::Column(Column::from_name(column_name))]) + } + } else { + Ok(vec![transformed_expr]) + } +} + +// write test for recursive_transform_unnest +#[cfg(test)] +mod tests { + use std::{ops::Add, sync::Arc}; + + use super::*; + use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + use arrow_schema::Fields; + use datafusion_common::DFSchema; + use datafusion_expr::EmptyRelation; + + #[test] + fn test_recursive_transform_unnest() -> Result<()> { + let schema = Schema::new(vec![ + Field::new( + "struct_col", + ArrowDataType::Struct(Fields::from(vec![ + Field::new("field1", ArrowDataType::Int32, false), + Field::new("field2", ArrowDataType::Int32, false), + ])), + false, + ), + Field::new( + "array_col", + ArrowDataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + true, + ), + Field::new("int_col", ArrowDataType::Int32, false), + ]); + + let dfschema = DFSchema::try_from(schema)?; + + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(dfschema), + }); + + let mut unnest_placeholder_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + // unnest(struct_col) + let original_expr = Expr::Unnest(Unnest { + expr: Box::new(Expr::Column(Column::from_name("struct_col"))), + }); + let transformed_exprs = recursive_transform_unnest( + &input, + &mut unnest_placeholder_columns, + &mut inner_projection_exprs, + original_expr, + )?; + assert_eq!( + transformed_exprs, + vec![ + Expr::Column(Column::from_name("unnest(struct_col).field1")), + Expr::Column(Column::from_name("unnest(struct_col).field2")), + ] + ); + assert_eq!(unnest_placeholder_columns, vec!["unnest(struct_col)"]); + // still reference struct_col in original schema but with alias, + // to avoid colliding with the projection on the column itself if any + assert_eq!( + inner_projection_exprs, + vec![ + Expr::Column(Column::from_name("struct_col")).alias("unnest(struct_col)"), + ] + ); + + // unnest(array_col ) + 1 + let original_expr = Expr::Unnest(Unnest { + expr: Box::new(Expr::Column(Column::from_name("array_col"))), + }) + .add(Expr::Literal(ScalarValue::Int64(Some(1)))); Review Comment: ```suggestion .add(lit(1i64)); ``` There are similar simplifications we could do below too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org