alamb commented on code in PR #6796: URL: https://github.com/apache/arrow-datafusion/pull/6796#discussion_r1279366021
########## datafusion/core/tests/sqllogictests/test_files/array.slt: ########## @@ -1794,9 +1802,150 @@ select make_array(f0) from fixed_size_list_array ---- [[1, 2], [3, 4]] +## Unnest -### Delete tables +# Set target partitions to 1 for deterministic results +statement ok +set datafusion.execution.target_partitions = 1; Review Comment: Another way to get deterministic results rather than changing `target_partitions` here is to use [`rowsort`](https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/sqllogictests/README.md#sqllogictests) sort_mode So instead of ``` query ??? ``` Use ``` query ??? rowsort ``` Which will sort the output after execution ########## datafusion/core/tests/sqllogictests/test_files/array.slt: ########## @@ -1794,9 +1802,150 @@ select make_array(f0) from fixed_size_list_array ---- [[1, 2], [3, 4]] +## Unnest -### Delete tables +# Set target partitions to 1 for deterministic results +statement ok +set datafusion.execution.target_partitions = 1; + +query ?? +select unnest(make_array(1,2,3)), + unnest(make_array(4,5)) +; +---- +1 4 +2 5 +3 NULL + +query ??? +select unnest(make_array(1,2,3)), + unnest(make_array(4,5)), + unnest(make_array(6,7,8,9)) +; +---- +1 4 6 +2 5 7 +3 NULL 8 +NULL NULL 9 + +query ??? +select unnest(make_array(1,2,3,4,5)), + unnest(make_array(6,7)), + unnest(make_array(8,9,10,11,22,33)) +; +---- +1 6 8 +2 7 9 +3 NULL 10 +4 NULL 11 +5 NULL 22 +NULL NULL 33 + +# Select From + +query IIIII +select * from unnest( + make_array(1), + make_array(2,3), + make_array(4,5,6), + make_array(7,8), + make_array(9) +); +---- +1 2 4 7 9 +NULL 3 5 8 NULL +NULL NULL 6 NULL NULL + +query I +select * from unnest(make_array(1,2,3)) as data +---- +1 +2 +3 + +query II +select * from unnest(make_array(1,2,3),make_array(7,6,5,4)) as data(a,b) order by b +---- +NULL 4 +3 5 +2 6 +1 7 + +query ?T?I +select * from arrays_unnest; +---- +[1, 2] A [1, 2] 3 +NULL B [4] 5 +[3] C [6, 7, 8] 9 + +# TODO: Unnest columns fails +query error DataFusion error: SQL error: TokenizerError\("Unterminated string literal at Line: 2, Column 95"\) +caused by +Internal error: UNNEST only supports list type\. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker Review Comment: Can we please change this to `NotImplemented` rather than `InternalError` (so people don't file bug reports :) ) ########## datafusion/expr/src/expr_schema.rs: ########## @@ -84,6 +85,16 @@ impl ExprSchemable for Expr { .collect::<Result<Vec<_>>>()?; Ok((fun.return_type)(&data_types)?.as_ref().clone()) } + Expr::Unnest(Unnest { array_exprs }) => { + let data_types = array_exprs + .iter() + .map(|e| e.get_type(schema)) + .collect::<Result<Vec<_>>>()?; + // Consider data types are the same for now + // TODO: Return correct data type for inconsistent data types Review Comment: Does this mean ensure all arguments should be the same type? Maybe we can return an "Not yet implemented" error if they are inconsistent ########## datafusion/core/src/datasource/listing/helpers.rs: ########## @@ -90,6 +90,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { | Expr::GroupingSet(_) | Expr::Case { .. } => VisitRecursion::Continue, + Expr::Unnest { .. } => todo!("Unnest not implemented yet"), Review Comment: Can we please change this to return `DataFusionError::NotImplemented` rather than `todo()` (which will panic)? ########## datafusion/expr/src/expr_schema.rs: ########## @@ -84,6 +85,16 @@ impl ExprSchemable for Expr { .collect::<Result<Vec<_>>>()?; Ok((fun.return_type)(&data_types)?.as_ref().clone()) } + Expr::Unnest(Unnest { array_exprs }) => { + let data_types = array_exprs + .iter() + .map(|e| e.get_type(schema)) + .collect::<Result<Vec<_>>>()?; + // Consider data types are the same for now + // TODO: Return correct data type for inconsistent data types + let return_type = data_types.first().unwrap(); Review Comment: This panic's when there are no arguments -- can we please make it return a real error instead? ```rust DataFusion CLI v28.0.0 ❯ select unnest(); thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', /Users/alamb/Software/arrow-datafusion2/datafusion/expr/src/expr_schema.rs:95:54 stack backtrace: 0: rust_begin_unwind ``` ########## datafusion/expr/src/expr.rs: ########## @@ -931,6 +948,40 @@ impl Expr { pub fn contains_outer(&self) -> bool { !find_out_reference_exprs(self).is_empty() } + + pub fn flatten(&self) -> Self { Review Comment: Can we please add doc comments about what this function does? Bonus points for an example. Perhaps also point out it is the opposite of `unnest`? ########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -1038,6 +1042,114 @@ impl LogicalPlanBuilder { pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> { Ok(Self::from(unnest(self.plan, column.into())?)) } + + pub fn join_unnest_plans( + unnest_plans: Vec<LogicalPlan>, + columns_name: Vec<String>, + ) -> Result<LogicalPlan> { + // Add row_number for each unnested array + let window_func_expr = Expr::WindowFunction(expr::WindowFunction::new( + WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::RowNumber), + vec![], + vec![], + vec![], + WindowFrame::new(false), + )); + let window_func_exprs = vec![window_func_expr.clone()]; + + let window_plans = unnest_plans + .into_iter() + .map(|plan| LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())) + .collect::<Result<Vec<LogicalPlan>>>()?; + + // Create alias for row number + let row_numbers_name: Vec<String> = (0..columns_name.len()) + .map(|idx| format!("rn{}", idx)) + .collect(); + let project_exprs: Vec<Vec<Expr>> = columns_name + .iter() + .zip(row_numbers_name.iter()) + .map(|(col_name, row_number_name)| { + vec![ + ident(col_name), + window_func_expr.clone().alias(row_number_name), + ] + }) + .collect(); + let project_plans = window_plans + .iter() + .zip(project_exprs.into_iter()) + .map(|(plan, expr)| { + LogicalPlanBuilder::from(plan.clone()) + .project(expr)? + .build() + }) + .collect::<Result<Vec<LogicalPlan>>>()?; + + // Wrap each unnested array into a subquery + let subqueries_alias: Vec<String> = (0..project_plans.len()) + .map(|idx| format!("sq{}", idx)) + .collect(); + let subqueries_alias_plan = project_plans + .into_iter() + .zip(subqueries_alias.iter()) + .map(|(plan, alias)| { + LogicalPlanBuilder::from(plan).alias(alias.clone())?.build() + }) + .collect::<Result<Vec<LogicalPlan>>>()?; + + // Create alias for columns to apply join + let columns_to_join_on = subqueries_alias + .iter() + .zip(row_numbers_name.iter()) + .map(|(alias, rn)| col(format!("{}.{}", alias, rn))) + .collect::<Vec<Expr>>(); + + let (join_plan, _) = subqueries_alias_plan + .iter() + .zip(columns_to_join_on.iter()) + .skip(1) + .fold( + Ok(( + subqueries_alias_plan[0].clone(), + columns_to_join_on[0].clone(), + )), + |result: Result<(LogicalPlan, Expr)>, (right_plan, right_column)| { + result.and_then(|(left_plan, left_column)| { + let plan = LogicalPlanBuilder::from(left_plan) + .join( + right_plan.clone(), + JoinType::Full, + (Vec::<Column>::new(), Vec::<Column>::new()), + Some(left_column.eq(right_column.clone())), + )? + .build()?; + Ok((plan, right_column.clone())) + }) + }, + )?; + + // Select unnested array only, row_number is not needed + let selected_exprs: Vec<Expr> = subqueries_alias + .into_iter() + .zip(columns_name.into_iter()) + .map(|(sq, col_name)| col(format!("{sq}.{col_name}"))) + .collect(); + + LogicalPlanBuilder::from(join_plan) + .project(selected_exprs)? + .build() + } + + pub fn unnest_arrays(self, array_exprs: Vec<Expr>) -> Result<Self> { Review Comment: likewise this public API should also have documentation I think ########## datafusion/core/src/datasource/listing/helpers.rs: ########## @@ -90,6 +90,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { | Expr::GroupingSet(_) | Expr::Case { .. } => VisitRecursion::Continue, + Expr::Unnest { .. } => todo!("Unnest not implemented yet"), Review Comment: It would also be fine and conservative to set `is_applicable = false` for unnest and return `Stop` ########## datafusion/sql/tests/sql_integration.rs: ########## @@ -2962,36 +2962,37 @@ fn cte_with_no_column_names() { #[test] fn cte_with_column_names() { - let sql = "WITH \ - numbers(a, b, c) AS ( \ - SELECT 1, 2, 3 \ - ) \ - SELECT * FROM numbers;"; + let sql = r#"WITH + numbers(a, b, c) AS ( + SELECT 1, 2, 3 + ) + SELECT * FROM numbers;"#; - let expected = "Projection: numbers.a, numbers.b, numbers.c\ - \n SubqueryAlias: numbers\ - \n Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c\ - \n Projection: Int64(1), Int64(2), Int64(3)\ - \n EmptyRelation"; + let expected = r#"Projection: numbers.a, numbers.b, numbers.c + SubqueryAlias: numbers + Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c + Projection: Int64(1), Int64(2), Int64(3) + EmptyRelation"#; - quick_test(sql, expected) + quick_test(sql, expected); } #[test] fn cte_with_column_aliases_precedence() { Review Comment: are the changes in this test related to unnest? Maybe we could break them out into their own PR for review ########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -1431,6 +1543,122 @@ pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> { })) } +/// Create unnest plan from arrays. +pub fn build_unnest_plans( Review Comment: SHould this be public? ########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -1038,6 +1042,114 @@ impl LogicalPlanBuilder { pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> { Ok(Self::from(unnest(self.plan, column.into())?)) } + + pub fn join_unnest_plans( Review Comment: Can we please add some doc comments about what this function does and what its arguments are? ########## datafusion/proto/src/logical_plan/to_proto.rs: ########## @@ -717,6 +717,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode { .to_string(), )) } + Expr::Unnest(..) => todo!("Unnest not supported"), Review Comment: Also should return an error rather than panic ########## datafusion/optimizer/src/optimizer.rs: ########## @@ -422,6 +424,11 @@ fn assert_schema_is_the_same( prev_plan: &LogicalPlan, new_plan: &LogicalPlan, ) -> Result<()> { + if rule_name == "unnest_expressions" { Review Comment: this doesn't see right to me -- maybe there is a bug in the output schema that `Unnest` reports? ########## datafusion/expr/src/expr_schema.rs: ########## @@ -84,6 +85,16 @@ impl ExprSchemable for Expr { .collect::<Result<Vec<_>>>()?; Ok((fun.return_type)(&data_types)?.as_ref().clone()) } + Expr::Unnest(Unnest { array_exprs }) => { + let data_types = array_exprs + .iter() + .map(|e| e.get_type(schema)) + .collect::<Result<Vec<_>>>()?; + // Consider data types are the same for now + // TODO: Return correct data type for inconsistent data types + let return_type = data_types.first().unwrap(); Review Comment: This panic's when there are no arguments -- can we please make it return a real error instead? ```rust DataFusion CLI v28.0.0 ❯ select unnest(); thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', /Users/alamb/Software/arrow-datafusion2/datafusion/expr/src/expr_schema.rs:95:54 stack backtrace: 0: rust_begin_unwind ``` ########## datafusion/optimizer/src/unnest_expressions.rs: ########## @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::expr::Alias; +use datafusion_expr::expr::Unnest; +use datafusion_expr::Expr; +use datafusion_expr::LogicalPlan; +use datafusion_expr::LogicalPlanBuilder; + +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; + +#[derive(Default)] +pub struct UnnestExpressions {} + +impl OptimizerRule for UnnestExpressions { + fn name(&self) -> &str { + "unnest_expressions" + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result<Option<LogicalPlan>> { + Ok(Some(Self::optimize_internal(plan)?)) + } +} + +impl UnnestExpressions { + pub fn new() -> Self { + Self::default() + } + + fn optimize_internal(plan: &LogicalPlan) -> Result<LogicalPlan> { + if let LogicalPlan::Projection(_) = plan { Review Comment: You might be able to use https://docs.rs/datafusion/latest/datafusion/logical_expr/utils/fn.inspect_expr_pre.html or some other TreeVisit methods to recursively walk the expression trees -- or maybe you only want to walk the top level ones 🤔 ########## datafusion/optimizer/src/unnest_expressions.rs: ########## @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::expr::Alias; +use datafusion_expr::expr::Unnest; +use datafusion_expr::Expr; +use datafusion_expr::LogicalPlan; +use datafusion_expr::LogicalPlanBuilder; + +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; + +#[derive(Default)] +pub struct UnnestExpressions {} + +impl OptimizerRule for UnnestExpressions { + fn name(&self) -> &str { + "unnest_expressions" + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result<Option<LogicalPlan>> { + Ok(Some(Self::optimize_internal(plan)?)) + } +} + +impl UnnestExpressions { + pub fn new() -> Self { + Self::default() + } + + fn optimize_internal(plan: &LogicalPlan) -> Result<LogicalPlan> { + if let LogicalPlan::Projection(_) = plan { Review Comment: You might be able to use https://docs.rs/datafusion/latest/datafusion/logical_expr/utils/fn.inspect_expr_pre.html or some other TreeVisit methods to recursively walk the expression trees -- or maybe you only want to walk the top level ones 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
