This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 03557131d2 feat: extend `unnest` to support Struct datatype (#10429)
03557131d2 is described below
commit 03557131d2c8f2ccc428b21681472db11237212e
Author: Duong Cong Toai <[email protected]>
AuthorDate: Wed May 22 22:53:55 2024 +0200
feat: extend `unnest` to support Struct datatype (#10429)
* feat: extend unnest for struct
* compile err
* debugging
* finish basic
* chore: complete impl
* chore: clean garbage
* chore: more test
* test: fix df test
* prettify display
* fix unit test
* chore: compile err
* chore: fix physical exec err
* add sqllogic test
* chore: more doc
* chore: refactor
* fix doc
* fmt
* fix doc
* ut for recursive transform unnest
* a small integration test
* fix comment
---
datafusion/core/src/dataframe/mod.rs | 6 +-
datafusion/core/src/physical_planner.rs | 16 +-
datafusion/core/tests/data/unnest.json | 2 +
datafusion/core/tests/dataframe/mod.rs | 12 +-
datafusion/expr/src/expr_fn.rs | 9 +-
datafusion/expr/src/expr_schema.rs | 2 +-
datafusion/expr/src/logical_plan/builder.rs | 220 +++++++++++++++------
datafusion/expr/src/logical_plan/display.rs | 21 +-
datafusion/expr/src/logical_plan/plan.rs | 46 ++++-
datafusion/expr/src/logical_plan/tree_node.rs | 14 +-
.../optimizer/src/optimize_projections/mod.rs | 9 +-
datafusion/physical-plan/src/unnest.rs | 183 ++++++++++++-----
datafusion/sql/src/expr/function.rs | 6 +-
datafusion/sql/src/select.rs | 62 +++---
datafusion/sql/src/utils.rs | 187 +++++++++++++++++-
datafusion/sql/tests/sql_integration.rs | 38 ++++
datafusion/sqllogictest/test_files/unnest.slt | 81 ++++++--
17 files changed, 707 insertions(+), 207 deletions(-)
diff --git a/datafusion/core/src/dataframe/mod.rs
b/datafusion/core/src/dataframe/mod.rs
index 04aaf5a890..d4626134ac 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -263,7 +263,7 @@ impl DataFrame {
self.unnest_columns_with_options(&[column], options)
}
- /// Expand multiple list columns into a set of rows.
+ /// Expand multiple list/struct columns into a set of rows and new columns.
///
/// See also:
///
@@ -277,8 +277,8 @@ impl DataFrame {
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
- /// let df = ctx.read_csv("tests/data/example.csv",
CsvReadOptions::new()).await?;
- /// let df = df.unnest_columns(&["a", "b"])?;
+ /// let df = ctx.read_json("tests/data/unnest.json",
NdJsonReadOptions::default()).await?;
+ /// let df = df.unnest_columns(&["b","c","d"])?;
/// # Ok(())
/// # }
/// ```
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 090b1d59d9..bc5818361b 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -49,7 +49,7 @@ use crate::physical_plan::aggregates::{AggregateExec,
AggregateMode, PhysicalGro
use crate::physical_plan::analyze::AnalyzeExec;
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::explain::ExplainExec;
-use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
+use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils as join_utils;
use crate::physical_plan::joins::{
@@ -1112,24 +1112,18 @@ impl DefaultPhysicalPlanner {
Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
}
LogicalPlan::Unnest(Unnest {
- columns,
+ list_type_columns,
+ struct_type_columns,
schema,
options,
..
}) => {
let input = children.one()?;
- let column_execs = columns
- .iter()
- .map(|column| {
- schema
- .index_of_column(column)
- .map(|idx| Column::new(&column.name, idx))
- })
- .collect::<Result<_>>()?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Arc::new(UnnestExec::new(
input,
- column_execs,
+ list_type_columns.clone(),
+ struct_type_columns.clone(),
schema,
options.clone(),
))
diff --git a/datafusion/core/tests/data/unnest.json
b/datafusion/core/tests/data/unnest.json
new file mode 100644
index 0000000000..5999171c28
--- /dev/null
+++ b/datafusion/core/tests/data/unnest.json
@@ -0,0 +1,2 @@
+{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true],"d":{"e":1,"f":2}}
+{"a":2, "b":[3.0, 2.3, -7.1], "c":[false, true]}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index 009f45b280..9b7cb85614 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -1231,11 +1231,11 @@ async fn unnest_aggregate_columns() -> Result<()> {
.collect()
.await?;
let expected = [
- r#"+--------------------+"#,
- r#"| COUNT(shapes.tags) |"#,
- r#"+--------------------+"#,
- r#"| 9 |"#,
- r#"+--------------------+"#,
+ r#"+-------------+"#,
+ r#"| COUNT(tags) |"#,
+ r#"+-------------+"#,
+ r#"| 9 |"#,
+ r#"+-------------+"#,
];
assert_batches_sorted_eq!(expected, &results);
@@ -1384,7 +1384,7 @@ async fn unnest_with_redundant_columns() -> Result<()> {
let optimized_plan = df.clone().into_optimized_plan()?;
let expected = vec![
"Projection: shapes.shape_id [shape_id:UInt32]",
- " Unnest: shape_id2 [shape_id:UInt32, shape_id2:UInt32;N]",
+ " Unnest: lists[shape_id2] structs[] [shape_id:UInt32,
shape_id2:UInt32;N]",
" Aggregate: groupBy=[[shapes.shape_id]],
aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32,
shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} });N]",
" TableScan: shapes projection=[shape_id] [shape_id:UInt32]",
];
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 64763a9736..2a2bb75f18 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -19,7 +19,7 @@
use crate::expr::{
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList,
InSubquery,
- Placeholder, TryCast,
+ Placeholder, TryCast, Unnest,
};
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
@@ -489,6 +489,13 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
CaseBuilder::new(None, vec![when], vec![then], None)
}
+/// Create a Unnest expression
+pub fn unnest(expr: Expr) -> Expr {
+ Expr::Unnest(Unnest {
+ expr: Box::new(expr),
+ })
+}
+
/// Convenience method to create a new user defined scalar function (UDF) with
a
/// specific signature and specific return type.
///
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 8b7f30d245..01c9edff30 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -121,7 +121,7 @@ impl ExprSchemable for Expr {
Ok(field.data_type().clone())
}
DataType::Struct(_) => {
- not_impl_err!("unnest() does not support struct yet")
+ Ok(arg_data_type)
}
DataType::Null => {
not_impl_err!("unnest() does not support null yet")
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 3b1b1196f1..8483525d7f 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -51,9 +51,9 @@ use arrow::datatypes::{DataType, Field, Fields, Schema,
SchemaRef};
use datafusion_common::config::FormatOptions;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::{
- get_target_functional_dependencies, not_impl_err, plan_datafusion_err,
plan_err,
- Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
TableReference,
- ToDFSchema, UnnestOptions,
+ get_target_functional_dependencies, internal_err, not_impl_err,
plan_datafusion_err,
+ plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue,
+ TableReference, ToDFSchema, UnnestOptions,
};
/// Default table name for unnamed table
@@ -1592,7 +1592,53 @@ impl TableSource for LogicalTableSource {
/// Create a [`LogicalPlan::Unnest`] plan
pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan>
{
- unnest_with_options(input, columns, UnnestOptions::new())
+ unnest_with_options(input, columns, UnnestOptions::default())
+}
+
+// Based on data type, either struct or a variant of list
+// return a set of columns as the result of unnesting
+// the input columns.
+// For example, given a column with name "a",
+// - List(Element) returns ["a"] with data type Element
+// - Struct(field1, field2) returns ["a.field1","a.field2"]
+pub fn get_unnested_columns(
+ col_name: &String,
+ data_type: &DataType,
+) -> Result<Vec<(Column, Arc<Field>)>> {
+ let mut qualified_columns = Vec::with_capacity(1);
+
+ match data_type {
+ DataType::List(field)
+ | DataType::FixedSizeList(field, _)
+ | DataType::LargeList(field) => {
+ let new_field = Arc::new(Field::new(
+ col_name.clone(),
+ field.data_type().clone(),
+ // Unnesting may produce NULLs even if the list is not null.
+ // For example: unnset([1], []) -> 1, null
+ true,
+ ));
+ let column = Column::from_name(col_name);
+ // let column = Column::from((None, &new_field));
+ qualified_columns.push((column, new_field));
+ }
+ DataType::Struct(fields) => {
+ qualified_columns.extend(fields.iter().map(|f| {
+ let new_name = format!("{}.{}", col_name, f.name());
+ let column = Column::from_name(&new_name);
+ let new_field = f.as_ref().clone().with_name(new_name);
+ // let column = Column::from((None, &f));
+ (column, Arc::new(new_field))
+ }))
+ }
+ _ => {
+ return internal_err!(
+ "trying to unnest on invalid data type {:?}",
+ data_type
+ );
+ }
+ };
+ Ok(qualified_columns)
}
/// Create a [`LogicalPlan::Unnest`] plan with options
@@ -1601,41 +1647,59 @@ pub fn unnest_with_options(
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<LogicalPlan> {
- // Extract the type of the nested field in the list.
- let mut unnested_fields: HashMap<usize, _> =
HashMap::with_capacity(columns.len());
- // Add qualifiers to the columns.
- let mut qualified_columns = Vec::with_capacity(columns.len());
- for c in &columns {
- let index = input.schema().index_of_column(c)?;
- let (unnest_qualifier, unnest_field) =
input.schema().qualified_field(index);
- let unnested_field = match unnest_field.data_type() {
- DataType::List(field)
- | DataType::FixedSizeList(field, _)
- | DataType::LargeList(field) => Arc::new(Field::new(
- unnest_field.name(),
- field.data_type().clone(),
- // Unnesting may produce NULLs even if the list is not null.
- // For example: unnset([1], []) -> 1, null
- true,
- )),
- _ => {
- // If the unnest field is not a list type return the input
plan.
- return Ok(input);
- }
- };
- qualified_columns.push(Column::from((unnest_qualifier,
&unnested_field)));
- unnested_fields.insert(index, unnested_field);
- }
+ let mut list_columns = Vec::with_capacity(columns.len());
+ let mut struct_columns = Vec::with_capacity(columns.len());
+ let column_by_original_index = columns
+ .iter()
+ .map(|c| Ok((input.schema().index_of_column(c)?, c)))
+ .collect::<Result<HashMap<usize, &Column>>>()?;
- // Update the schema with the unnest column types changed to contain the
nested types.
let input_schema = input.schema();
+
+ let mut dependency_indices = vec![];
+ // Transform input schema into new schema
+ // e.g int, unnest([]int), unnest(struct(varchar,varchar))
+ // becomes int, int, varchar, varchar
let fields = input_schema
.iter()
.enumerate()
- .map(|(index, (q, f))| match unnested_fields.get(&index) {
- Some(unnested_field) => (q.cloned(), unnested_field.clone()),
- None => (q.cloned(), f.clone()),
+ .map(|(index, (original_qualifier, original_field))| {
+ match column_by_original_index.get(&index) {
+ Some(&column_to_unnest) => {
+ let flatten_columns = get_unnested_columns(
+ &column_to_unnest.name,
+ original_field.data_type(),
+ )?;
+ match original_field.data_type() {
+ DataType::List(_)
+ | DataType::FixedSizeList(_, _)
+ | DataType::LargeList(_) => list_columns.push(index),
+ DataType::Struct(_) => struct_columns.push(index),
+ _ => {
+ panic!(
+ "not reachable, should be caught by
get_unnested_columns"
+ )
+ }
+ }
+ // new columns dependent on the same original index
+ dependency_indices
+
.extend(std::iter::repeat(index).take(flatten_columns.len()));
+ Ok(flatten_columns
+ .iter()
+ .map(|col: &(Column, Arc<Field>)| {
+ (col.0.relation.to_owned(), col.1.to_owned())
+ })
+ .collect())
+ }
+ None => {
+ dependency_indices.push(index);
+ Ok(vec![(original_qualifier.cloned(),
original_field.clone())])
+ }
+ }
})
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
.collect::<Vec<_>>();
let metadata = input_schema.metadata().clone();
@@ -1643,9 +1707,13 @@ pub fn unnest_with_options(
// We can use the existing functional dependencies:
let deps = input_schema.functional_dependencies().clone();
let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
+
Ok(LogicalPlan::Unnest(Unnest {
input: Arc::new(input),
- columns: qualified_columns,
+ exec_columns: columns,
+ list_type_columns: list_columns,
+ struct_type_columns: struct_columns,
+ dependency_indices,
schema,
options,
}))
@@ -2074,13 +2142,13 @@ mod tests {
#[test]
fn plan_builder_unnest() -> Result<()> {
- // Unnesting a simple column should return the child plan.
- let plan = nested_table_scan("test_table")?
- .unnest_column("scalar")?
- .build()?;
-
- let expected = "TableScan: test_table";
- assert_eq!(expected, format!("{plan:?}"));
+ // Cannot unnest on a scalar column
+ let err = nested_table_scan("test_table")?
+ .unnest_column("scalar")
+ .unwrap_err();
+ assert!(err
+ .to_string()
+ .starts_with("Internal error: trying to unnest on invalid data
type UInt32"));
// Unnesting the strings list.
let plan = nested_table_scan("test_table")?
@@ -2088,36 +2156,65 @@ mod tests {
.build()?;
let expected = "\
- Unnest: test_table.strings\
+ Unnest: lists[test_table.strings] structs[]\
\n TableScan: test_table";
assert_eq!(expected, format!("{plan:?}"));
// Check unnested field is a scalar
- let field = plan
- .schema()
- .field_with_name(Some(&TableReference::bare("test_table")),
"strings")
- .unwrap();
+ let field = plan.schema().field_with_name(None, "strings").unwrap();
assert_eq!(&DataType::Utf8, field.data_type());
- // Unnesting multiple fields.
+ // Unnesting the singular struct column result into 2 new columns for
each subfield
+ let plan = nested_table_scan("test_table")?
+ .unnest_column("struct_singular")?
+ .build()?;
+
+ let expected = "\
+ Unnest: lists[] structs[test_table.struct_singular]\
+ \n TableScan: test_table";
+ assert_eq!(expected, format!("{plan:?}"));
+
+ for field_name in &["a", "b"] {
+ // Check unnested struct field is a scalar
+ let field = plan
+ .schema()
+ .field_with_name(None, &format!("struct_singular.{}",
field_name))
+ .unwrap();
+ assert_eq!(&DataType::UInt32, field.data_type());
+ }
+
+ // Unnesting multiple fields in separate plans
let plan = nested_table_scan("test_table")?
.unnest_column("strings")?
.unnest_column("structs")?
+ .unnest_column("struct_singular")?
.build()?;
let expected = "\
- Unnest: test_table.structs\
- \n Unnest: test_table.strings\
- \n TableScan: test_table";
+ Unnest: lists[] structs[test_table.struct_singular]\
+ \n Unnest: lists[test_table.structs] structs[]\
+ \n Unnest: lists[test_table.strings] structs[]\
+ \n TableScan: test_table";
assert_eq!(expected, format!("{plan:?}"));
// Check unnested struct list field should be a struct.
- let field = plan
- .schema()
- .field_with_name(Some(&TableReference::bare("test_table")),
"structs")
- .unwrap();
+ let field = plan.schema().field_with_name(None, "structs").unwrap();
assert!(matches!(field.data_type(), DataType::Struct(_)));
+ // Unnesting multiple fields at the same time
+ let cols = vec!["strings", "structs", "struct_singular"]
+ .into_iter()
+ .map(|c| c.into())
+ .collect();
+ let plan = nested_table_scan("test_table")?
+ .unnest_columns_with_options(cols, UnnestOptions::default())?
+ .build()?;
+
+ let expected = "\
+ Unnest: lists[test_table.strings, test_table.structs]
structs[test_table.struct_singular]\
+ \n TableScan: test_table";
+ assert_eq!(expected, format!("{plan:?}"));
+
// Unnesting missing column should fail.
let plan = nested_table_scan("test_table")?.unnest_column("missing");
assert!(plan.is_err());
@@ -2126,8 +2223,9 @@ mod tests {
}
fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
- // Create a schema with a scalar field, a list of strings, and a list
of structs.
- let struct_field = Field::new_struct(
+ // Create a schema with a scalar field, a list of strings, a list of
structs
+ // and a singular struct
+ let struct_field_in_list = Field::new_struct(
"item",
vec![
Field::new("a", DataType::UInt32, false),
@@ -2139,7 +2237,15 @@ mod tests {
let schema = Schema::new(vec![
Field::new("scalar", DataType::UInt32, false),
Field::new_list("strings", string_field, false),
- Field::new_list("structs", struct_field, false),
+ Field::new_list("structs", struct_field_in_list.clone(), false),
+ Field::new(
+ "struct_singular",
+ DataType::Struct(Fields::from(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::UInt32, false),
+ ])),
+ false,
+ ),
]);
table_scan(Some(table_name), &schema, None)
diff --git a/datafusion/expr/src/logical_plan/display.rs
b/datafusion/expr/src/logical_plan/display.rs
index 3a2ed9ffc2..f3765fb184 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -30,7 +30,7 @@ use crate::dml::CopyTo;
use arrow::datatypes::Schema;
use datafusion_common::display::GraphvizBuilder;
use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
-use datafusion_common::DataFusionError;
+use datafusion_common::{Column, DataFusionError};
use serde_json::json;
/// Formats plans with a single line per node. For example:
@@ -638,10 +638,25 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Node Type": "DescribeTable"
})
}
- LogicalPlan::Unnest(Unnest { columns, .. }) => {
+ LogicalPlan::Unnest(Unnest {
+ input: plan,
+ list_type_columns: list_col_indices,
+ struct_type_columns: struct_col_indices,
+ ..
+ }) => {
+ let input_columns = plan.schema().columns();
+ let list_type_columns = list_col_indices
+ .iter()
+ .map(|i| &input_columns[*i])
+ .collect::<Vec<&Column>>();
+ let struct_type_columns = struct_col_indices
+ .iter()
+ .map(|i| &input_columns[*i])
+ .collect::<Vec<&Column>>();
json!({
"Node Type": "Unnest",
- "Column": expr_vec_fmt!(columns),
+ "ListColumn": expr_vec_fmt!(list_type_columns),
+ "StructColumn": expr_vec_fmt!(struct_type_columns),
})
}
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 42f3e1f163..97592c05ab 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -667,12 +667,12 @@ impl LogicalPlan {
LogicalPlan::DescribeTable(_) => Ok(self),
LogicalPlan::Unnest(Unnest {
input,
- columns,
- schema: _,
+ exec_columns,
options,
+ ..
}) => {
// Update schema with unnested column type.
- unnest_with_options(unwrap_arc(input), columns, options)
+ unnest_with_options(unwrap_arc(input), exec_columns, options)
}
}
}
@@ -1017,11 +1017,15 @@ impl LogicalPlan {
}
LogicalPlan::DescribeTable(_) => Ok(self.clone()),
LogicalPlan::Unnest(Unnest {
- columns, options, ..
+ exec_columns: columns,
+ options,
+ ..
}) => {
// Update schema with unnested column type.
let input = inputs.swap_remove(0);
- unnest_with_options(input, columns.clone(), options.clone())
+ let new_plan =
+ unnest_with_options(input, columns.clone(),
options.clone())?;
+ Ok(new_plan)
}
}
}
@@ -1790,8 +1794,23 @@ impl LogicalPlan {
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
- LogicalPlan::Unnest(Unnest { columns, .. }) => {
- write!(f, "Unnest: {}", expr_vec_fmt!(columns))
+ LogicalPlan::Unnest(Unnest {
+ input: plan,
+ list_type_columns: list_col_indices,
+ struct_type_columns: struct_col_indices, .. }) => {
+ let input_columns = plan.schema().columns();
+ let list_type_columns = list_col_indices
+ .iter()
+ .map(|i| &input_columns[*i])
+ .collect::<Vec<&Column>>();
+ let struct_type_columns = struct_col_indices
+ .iter()
+ .map(|i| &input_columns[*i])
+ .collect::<Vec<&Column>>();
+ // get items from input_columns indexed by
list_col_indices
+ write!(f, "Unnest: lists[{}] structs[{}]",
+ expr_vec_fmt!(list_type_columns),
+ expr_vec_fmt!(struct_type_columns))
}
}
}
@@ -2783,8 +2802,17 @@ pub enum Partitioning {
pub struct Unnest {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
- /// The columns to unnest
- pub columns: Vec<Column>,
+ /// Columns to run unnest on, can be a list of (List/Struct) columns
+ pub exec_columns: Vec<Column>,
+ /// refer to the indices(in the input schema) of columns
+ /// that have type list to run unnest on
+ pub list_type_columns: Vec<usize>,
+ /// refer to the indices (in the input schema) of columns
+ /// that have type struct to run unnest on
+ pub struct_type_columns: Vec<usize>,
+ /// Having items aligned with the output columns
+ /// representing which column in the input schema each output column
depends on
+ pub dependency_indices: Vec<usize>,
/// The output schema, containing the unnested field column.
pub schema: DFSchemaRef,
/// Options
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs
b/datafusion/expr/src/logical_plan/tree_node.rs
index ea1f1c3c85..215b2cb4d4 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -313,13 +313,19 @@ impl TreeNode for LogicalPlan {
}
LogicalPlan::Unnest(Unnest {
input,
- columns,
+ exec_columns: input_columns,
+ list_type_columns,
+ struct_type_columns,
+ dependency_indices,
schema,
options,
}) => rewrite_arc(input, f)?.update_data(|input| {
LogicalPlan::Unnest(Unnest {
input,
- columns,
+ exec_columns: input_columns,
+ dependency_indices,
+ list_type_columns,
+ struct_type_columns,
schema,
options,
})
@@ -492,7 +498,9 @@ impl LogicalPlan {
LogicalPlan::TableScan(TableScan { filters, .. }) => {
filters.iter().apply_until_stop(f)
}
- LogicalPlan::Unnest(Unnest { columns, .. }) => {
+ LogicalPlan::Unnest(unnest) => {
+ let columns = unnest.exec_columns.clone();
+
let exprs = columns
.iter()
.map(|c| Expr::Column(c.clone()))
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 49b52aa53a..af51814c96 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -30,6 +30,7 @@ use datafusion_common::{
JoinType, Result,
};
use datafusion_expr::expr::Alias;
+use datafusion_expr::Unnest;
use datafusion_expr::{
logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr,
Projection,
TableScan, Window,
@@ -289,7 +290,6 @@ fn optimize_projections(
LogicalPlan::Sort(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Repartition(_)
- | LogicalPlan::Unnest(_)
| LogicalPlan::Union(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Distinct(Distinct::On(_)) => {
@@ -399,6 +399,13 @@ fn optimize_projections(
"OptimizeProjection: should have handled in the match
statement above"
);
}
+ LogicalPlan::Unnest(Unnest {
+ dependency_indices, ..
+ }) => {
+ vec![RequiredIndicies::new_from_indices(
+ dependency_indices.clone(),
+ )]
+ }
};
// Required indices are currently ordered (child0, child1, ...)
diff --git a/datafusion/physical-plan/src/unnest.rs
b/datafusion/physical-plan/src/unnest.rs
index 06dd8230d3..a8151fe022 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -23,8 +23,8 @@ use std::{any::Any, sync::Arc};
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
use crate::{
- expressions::Column, DisplayFormatType, Distribution, ExecutionPlan,
PhysicalExpr,
- RecordBatchStream, SendableRecordBatchStream,
+ DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
+ SendableRecordBatchStream,
};
use arrow::array::{
@@ -36,18 +36,24 @@ use arrow::compute::kernels::zip::zip;
use arrow::compute::{cast, is_not_null, kernels, sum};
use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use arrow_array::{Int64Array, Scalar};
+use arrow_array::{Int64Array, Scalar, StructArray};
use arrow_ord::cmp::lt;
-use datafusion_common::{exec_datafusion_err, exec_err, Result, UnnestOptions};
+use datafusion_common::{
+ exec_datafusion_err, exec_err, internal_err, Result, UnnestOptions,
+};
use datafusion_execution::TaskContext;
+use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::EquivalenceProperties;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
+use hashbrown::HashSet;
use log::trace;
-/// Unnest the given columns by joining the row with each value in the
-/// nested type.
+/// Unnest the given columns (either with type struct or list)
+/// For list unnesting, each rows is vertically transformed into multiple rows
+/// For struct unnesting, each columns is horizontally transformed into
multiple columns,
+/// Thus the original RecordBatch with dimension (n x m) may have new
dimension (n' x m')
///
/// See [`UnnestOptions`] for more details and an example.
#[derive(Debug)]
@@ -56,8 +62,10 @@ pub struct UnnestExec {
input: Arc<dyn ExecutionPlan>,
/// The schema once the unnest is applied
schema: SchemaRef,
- /// The unnest columns
- columns: Vec<Column>,
+ /// indices of the list-typed columns in the input schema
+ list_column_indices: Vec<usize>,
+ /// indices of the struct-typed columns in the input schema
+ struct_column_indices: Vec<usize>,
/// Options
options: UnnestOptions,
/// Execution metrics
@@ -70,15 +78,18 @@ impl UnnestExec {
/// Create a new [UnnestExec].
pub fn new(
input: Arc<dyn ExecutionPlan>,
- columns: Vec<Column>,
+ list_column_indices: Vec<usize>,
+ struct_column_indices: Vec<usize>,
schema: SchemaRef,
options: UnnestOptions,
) -> Self {
let cache = Self::compute_properties(&input, schema.clone());
+
UnnestExec {
input,
schema,
- columns,
+ list_column_indices,
+ struct_column_indices,
options,
metrics: Default::default(),
cache,
@@ -137,7 +148,8 @@ impl ExecutionPlan for UnnestExec {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(UnnestExec::new(
children[0].clone(),
- self.columns.clone(),
+ self.list_column_indices.clone(),
+ self.struct_column_indices.clone(),
self.schema.clone(),
self.options.clone(),
)))
@@ -158,7 +170,8 @@ impl ExecutionPlan for UnnestExec {
Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
- columns: self.columns.clone(),
+ list_type_columns: self.list_column_indices.clone(),
+ struct_column_indices:
self.struct_column_indices.iter().copied().collect(),
options: self.options.clone(),
metrics,
}))
@@ -214,7 +227,8 @@ struct UnnestStream {
/// Unnested schema
schema: Arc<Schema>,
/// The unnest columns
- columns: Vec<Column>,
+ list_type_columns: Vec<usize>,
+ struct_column_indices: HashSet<usize>,
/// Options
options: UnnestOptions,
/// Metrics
@@ -251,8 +265,13 @@ impl UnnestStream {
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
let timer = self.metrics.elapsed_compute.timer();
- let result =
- build_batch(&batch, &self.schema, &self.columns,
&self.options);
+ let result = build_batch(
+ &batch,
+ &self.schema,
+ &self.list_type_columns,
+ &self.struct_column_indices,
+ &self.options,
+ );
self.metrics.input_batches.add(1);
self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
@@ -279,48 +298,105 @@ impl UnnestStream {
}
}
-/// For each row in a `RecordBatch`, some list columns need to be unnested.
-/// We will expand the values in each list into multiple rows,
+/// Given a set of struct column indices to flatten
+/// try converting the column in input into multiple subfield columns
+/// For example
+/// struct_col: [a: struct(item: int, name: string), b: int]
+/// with a batch
+/// {a: {item: 1, name: "a"}, b: 2},
+/// {a: {item: 3, name: "b"}, b: 4]
+/// will be converted into
+/// {a.item: 1, a.name: "a", b: 2},
+/// {a.item: 3, a.name: "b", b: 4}
+fn flatten_struct_cols(
+ input_batch: &[Arc<dyn Array>],
+ schema: &SchemaRef,
+ struct_column_indices: &HashSet<usize>,
+) -> Result<RecordBatch> {
+ // horizontal expansion because of struct unnest
+ let columns_expanded = input_batch
+ .iter()
+ .enumerate()
+ .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
+ Some(_) => match column_data.data_type() {
+ DataType::Struct(_) => {
+ let struct_arr =
+
column_data.as_any().downcast_ref::<StructArray>().unwrap();
+ Ok(struct_arr.columns().to_vec())
+ }
+ data_type => internal_err!(
+ "expecting column {} from input plan to be a struct, got
{:?}",
+ idx,
+ data_type
+ ),
+ },
+ None => Ok(vec![column_data.clone()]),
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect();
+ Ok(RecordBatch::try_new(schema.clone(), columns_expanded)?)
+}
+
+/// For each row in a `RecordBatch`, some list/struct columns need to be
unnested.
+/// - For list columns: We will expand the values in each list into multiple
rows,
/// taking the longest length among these lists, and shorter lists are padded
with NULLs.
-//
+/// - For struct columns: We will expand the struct columns into multiple
subfield columns.
/// For columns that don't need to be unnested, repeat their values until
reaching the longest length.
fn build_batch(
batch: &RecordBatch,
schema: &SchemaRef,
- columns: &[Column],
+ list_type_columns: &[usize],
+ struct_column_indices: &HashSet<usize>,
options: &UnnestOptions,
) -> Result<RecordBatch> {
- let list_arrays: Vec<ArrayRef> = columns
- .iter()
- .map(|column| column.evaluate(batch)?.into_array(batch.num_rows()))
- .collect::<Result<_>>()?;
+ let transformed = match list_type_columns.len() {
+ 0 => flatten_struct_cols(batch.columns(), schema,
struct_column_indices),
+ _ => {
+ let list_arrays: Vec<ArrayRef> = list_type_columns
+ .iter()
+ .map(|index| {
+ ColumnarValue::Array(batch.column(*index).clone())
+ .into_array(batch.num_rows())
+ })
+ .collect::<Result<_>>()?;
+
+ let longest_length = find_longest_length(&list_arrays, options)?;
+ let unnested_length = longest_length.as_primitive::<Int64Type>();
+ let total_length = if unnested_length.is_empty() {
+ 0
+ } else {
+ sum(unnested_length).ok_or_else(|| {
+ exec_datafusion_err!("Failed to calculate the total
unnested length")
+ })? as usize
+ };
+ if total_length == 0 {
+ return Ok(RecordBatch::new_empty(schema.clone()));
+ }
- let longest_length = find_longest_length(&list_arrays, options)?;
- let unnested_length = longest_length.as_primitive::<Int64Type>();
- let total_length = if unnested_length.is_empty() {
- 0
- } else {
- sum(unnested_length).ok_or_else(|| {
- exec_datafusion_err!("Failed to calculate the total unnested
length")
- })? as usize
+ // Unnest all the list arrays
+ let unnested_arrays =
+ unnest_list_arrays(&list_arrays, unnested_length,
total_length)?;
+ let unnested_array_map: HashMap<_, _> = unnested_arrays
+ .into_iter()
+ .zip(list_type_columns.iter())
+ .map(|(array, column)| (*column, array))
+ .collect();
+
+ // Create the take indices array for other columns
+ let take_indicies = create_take_indicies(unnested_length,
total_length);
+
+ // vertical expansion because of list unnest
+ let ret = flatten_list_cols_from_indices(
+ batch,
+ &unnested_array_map,
+ &take_indicies,
+ )?;
+ flatten_struct_cols(&ret, schema, struct_column_indices)
+ }
};
- if total_length == 0 {
- return Ok(RecordBatch::new_empty(schema.clone()));
- }
-
- // Unnest all the list arrays
- let unnested_arrays =
- unnest_list_arrays(&list_arrays, unnested_length, total_length)?;
- let unnested_array_map: HashMap<_, _> = unnested_arrays
- .into_iter()
- .zip(columns.iter())
- .map(|(array, column)| (column.index(), array))
- .collect();
-
- // Create the take indices array for other columns
- let take_indicies = create_take_indicies(unnested_length, total_length);
-
- batch_from_indices(batch, schema, &unnested_array_map, &take_indicies)
+ transformed
}
/// Find the longest list length among the given list arrays for each row.
@@ -505,7 +581,8 @@ fn unnest_list_array(
)?)
}
-/// Creates take indicies that will be used to expand all columns except for
the unnest [`columns`](UnnestExec::columns).
+/// Creates take indicies that will be used to expand all columns except for
the list type
+/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
/// Every column value needs to be repeated multiple times according to the
length array.
///
/// If the length array looks like this:
@@ -568,12 +645,11 @@ fn create_take_indicies(
/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
/// ```
///
-fn batch_from_indices(
+fn flatten_list_cols_from_indices(
batch: &RecordBatch,
- schema: &SchemaRef,
unnested_list_arrays: &HashMap<usize, ArrayRef>,
indices: &PrimitiveArray<Int64Type>,
-) -> Result<RecordBatch> {
+) -> Result<Vec<Arc<dyn Array>>> {
let arrays = batch
.columns()
.iter()
@@ -583,8 +659,7 @@ fn batch_from_indices(
None => Ok(kernels::take::take(arr, indices, None)?),
})
.collect::<Result<Vec<_>>>()?;
-
- Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
+ Ok(arrays)
}
#[cfg(test)]
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index 7abe5ecdae..1f8492b9ba 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -358,10 +358,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
match arg.get_type(schema)? {
DataType::List(_)
| DataType::LargeList(_)
- | DataType::FixedSizeList(_, _) => Ok(()),
- DataType::Struct(_) => {
- not_impl_err!("unnest() does not support struct yet")
- }
+ | DataType::FixedSizeList(_, _)
+ | DataType::Struct(_) => Ok(()),
DataType::Null => {
not_impl_err!("unnest() does not support null yet")
}
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 730e84cd09..d2cd1bcf3a 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -20,14 +20,14 @@ use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::{
- check_columns_satisfy_exprs, extract_aliases, rebase_expr,
resolve_aliases_to_exprs,
- resolve_columns, resolve_positions_to_exprs,
+ check_columns_satisfy_exprs, extract_aliases, rebase_expr,
+ recursive_transform_unnest, resolve_aliases_to_exprs, resolve_columns,
+ resolve_positions_to_exprs,
};
-use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
-use datafusion_expr::expr::{Alias, Unnest};
+use datafusion_expr::expr::Alias;
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
normalize_cols,
};
@@ -298,47 +298,29 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
let mut unnest_columns = vec![];
+ // from which column used for projection, before the unnest happen
+ // including non unnest column and unnest column
let mut inner_projection_exprs = vec![];
- let outer_projection_exprs = select_exprs
+ // expr returned here maybe different from the originals in
inner_projection_exprs
+ // for example:
+ // - unnest(struct_col) will be transformed into
unnest(struct_col).field1, unnest(struct_col).field2
+ // - unnest(array_col) will be transformed into
unnest(array_col).element
+ // - unnest(array_col) + 1 will be transformed into
unnest(array_col).element +1
+ let outer_projection_exprs: Vec<Expr> = select_exprs
.into_iter()
.map(|expr| {
- let Transformed {
- data: transformed_expr,
- transformed,
- tnr: _,
- } = expr.transform_up(|expr: Expr| {
- if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
- let column_name = expr.display_name()?;
- unnest_columns.push(column_name.clone());
- // Add alias for the argument expression, to avoid
naming conflicts with other expressions
- // in the select list. For example: `select
unnest(col1), col1 from t`.
- inner_projection_exprs
- .push(arg.clone().alias(column_name.clone()));
- Ok(Transformed::yes(Expr::Column(Column::from_name(
- column_name,
- ))))
- } else {
- Ok(Transformed::no(expr))
- }
- })?;
-
- if !transformed {
- if matches!(&transformed_expr, Expr::Column(_)) {
- inner_projection_exprs.push(transformed_expr.clone());
- Ok(transformed_expr)
- } else {
- // We need to evaluate the expr in the inner
projection,
- // outer projection just select its name
- let column_name = transformed_expr.display_name()?;
- inner_projection_exprs.push(transformed_expr);
- Ok(Expr::Column(Column::from_name(column_name)))
- }
- } else {
- Ok(transformed_expr)
- }
+ recursive_transform_unnest(
+ &input,
+ &mut unnest_columns,
+ &mut inner_projection_exprs,
+ expr,
+ )
})
- .collect::<Result<Vec<_>>>()?;
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect();
// Do the final projection
if unnest_columns.is_empty() {
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 2c50d3af1f..4ae486ef1a 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -26,9 +26,10 @@ use datafusion_common::tree_node::{Transformed,
TransformedResult, TreeNode};
use datafusion_common::{
exec_err, internal_err, plan_err, Column, DataFusionError, Result,
ScalarValue,
};
-use datafusion_expr::expr::{Alias, GroupingSet, WindowFunction};
+use datafusion_expr::builder::get_unnested_columns;
+use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction};
use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
-use datafusion_expr::{expr_vec_fmt, Expr, LogicalPlan};
+use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan};
use sqlparser::ast::Ident;
/// Make a best-effort attempt at resolving all columns in the expression tree
@@ -255,3 +256,185 @@ pub(crate) fn normalize_ident(id: Ident) -> String {
None => id.value.to_ascii_lowercase(),
}
}
+
+/// The context is we want to rewrite unnest() into
InnerProjection->Unnest->OuterProjection
+/// Given an expression which contains unnest expr as one of its children,
+/// Try transform depends on unnest type
+/// - For list column: unnest(col) with type list -> unnest(col) with type
list::item
+/// - For struct column: unnest(struct(field1, field2)) ->
unnest(struct).field1, unnest(struct).field2
+/// The transformed exprs will be used in the outer projection
+pub(crate) fn recursive_transform_unnest(
+ input: &LogicalPlan,
+ unnest_placeholder_columns: &mut Vec<String>,
+ inner_projection_exprs: &mut Vec<Expr>,
+ original_expr: Expr,
+) -> Result<Vec<Expr>> {
+ let mut transform =
+ |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> {
+ // Full context, we are trying to plan the execution as
InnerProjection->Unnest->OuterProjection
+ // inside unnest execution, each column inside the inner projection
+ // will be transformed into new columns. Thus we need to keep
track of these placeholding column names
+ let placeholder_name = unnest_expr.display_name()?;
+
+ unnest_placeholder_columns.push(placeholder_name.clone());
+ // Add alias for the argument expression, to avoid naming conflicts
+ // with other expressions in the select list. For example: `select
unnest(col1), col1 from t`.
+ // this extra projection is used to unnest transforming
+ inner_projection_exprs
+ .push(expr_in_unnest.clone().alias(placeholder_name.clone()));
+ let schema = input.schema();
+
+ let (data_type, _) =
expr_in_unnest.data_type_and_nullable(schema)?;
+
+ let outer_projection_columns =
+ get_unnested_columns(&placeholder_name, &data_type)?;
+ let expr = outer_projection_columns
+ .iter()
+ .map(|col| Expr::Column(col.0.clone()))
+ .collect::<Vec<_>>();
+ Ok(expr)
+ };
+ // expr transformed maybe either the same, or different from the originals
exprs
+ // for example:
+ // - unnest(struct_col) will be transformed into
unnest(struct_col).field1, unnest(struct_col).field2
+ // - unnest(array_col) will be transformed into unnest(array_col)
+ // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1
+
+ // Specifically handle root level unnest expr, this is the only place
+ // unnest on struct can be handled
+ if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
+ return transform(&original_expr, arg);
+ }
+ let Transformed {
+ data: transformed_expr,
+ transformed,
+ tnr: _,
+ } = original_expr.transform_up(|expr: Expr| {
+ if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
+ let (data_type, _) =
expr.data_type_and_nullable(input.schema())?;
+ if let DataType::Struct(_) = data_type {
+ return internal_err!("unnest on struct can ony be applied
at the root level of select expression");
+ }
+ let transformed_exprs = transform(&expr, arg)?;
+ Ok(Transformed::yes(transformed_exprs[0].clone()))
+ } else {
+ Ok(Transformed::no(expr))
+ }
+ })?;
+
+ if !transformed {
+ if matches!(&transformed_expr, Expr::Column(_)) {
+ inner_projection_exprs.push(transformed_expr.clone());
+ Ok(vec![transformed_expr])
+ } else {
+ // We need to evaluate the expr in the inner projection,
+ // outer projection just select its name
+ let column_name = transformed_expr.display_name()?;
+ inner_projection_exprs.push(transformed_expr);
+ Ok(vec![Expr::Column(Column::from_name(column_name))])
+ }
+ } else {
+ Ok(vec![transformed_expr])
+ }
+}
+
+// write test for recursive_transform_unnest
+#[cfg(test)]
+mod tests {
+ use std::{ops::Add, sync::Arc};
+
+ use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+ use arrow_schema::Fields;
+ use datafusion_common::{DFSchema, Result};
+ use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan};
+
+ use crate::utils::recursive_transform_unnest;
+
+ #[test]
+ fn test_recursive_transform_unnest() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new(
+ "struct_col",
+ ArrowDataType::Struct(Fields::from(vec![
+ Field::new("field1", ArrowDataType::Int32, false),
+ Field::new("field2", ArrowDataType::Int32, false),
+ ])),
+ false,
+ ),
+ Field::new(
+ "array_col",
+ ArrowDataType::List(Arc::new(Field::new(
+ "item",
+ ArrowDataType::Int64,
+ true,
+ ))),
+ true,
+ ),
+ Field::new("int_col", ArrowDataType::Int32, false),
+ ]);
+
+ let dfschema = DFSchema::try_from(schema)?;
+
+ let input = LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: Arc::new(dfschema),
+ });
+
+ let mut unnest_placeholder_columns = vec![];
+ let mut inner_projection_exprs = vec![];
+
+ // unnest(struct_col)
+ let original_expr = unnest(col("struct_col"));
+ let transformed_exprs = recursive_transform_unnest(
+ &input,
+ &mut unnest_placeholder_columns,
+ &mut inner_projection_exprs,
+ original_expr,
+ )?;
+ assert_eq!(
+ transformed_exprs,
+ vec![
+ col("unnest(struct_col).field1"),
+ col("unnest(struct_col).field2"),
+ ]
+ );
+ assert_eq!(unnest_placeholder_columns, vec!["unnest(struct_col)"]);
+ // still reference struct_col in original schema but with alias,
+ // to avoid colliding with the projection on the column itself if any
+ assert_eq!(
+ inner_projection_exprs,
+ vec![col("struct_col").alias("unnest(struct_col)"),]
+ );
+
+ // unnest(array_col) + 1
+ let original_expr = unnest(col("array_col")).add(lit(1i64));
+ let transformed_exprs = recursive_transform_unnest(
+ &input,
+ &mut unnest_placeholder_columns,
+ &mut inner_projection_exprs,
+ original_expr,
+ )?;
+ assert_eq!(
+ unnest_placeholder_columns,
+ vec!["unnest(struct_col)", "unnest(array_col)"]
+ );
+ // only transform the unnest children
+ assert_eq!(
+ transformed_exprs,
+ vec![col("unnest(array_col)").add(lit(1i64))]
+ );
+
+ // keep appending to the current vector
+ // still reference array_col in original schema but with alias,
+ // to avoid colliding with the projection on the column itself if any
+ assert_eq!(
+ inner_projection_exprs,
+ vec![
+ col("struct_col").alias("unnest(struct_col)"),
+ col("array_col").alias("unnest(array_col)")
+ ]
+ );
+
+ Ok(())
+ }
+}
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index bbedaca6a8..cca96b6eb9 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -2905,6 +2905,21 @@ impl ContextProvider for MockContextProvider {
Field::new("Id", DataType::UInt32, false),
Field::new("lower", DataType::UInt32, false),
])),
+ "unnest_table" => Ok(Schema::new(vec![
+ Field::new(
+ "array_col",
+ DataType::List(Arc::new(Field::new("item",
DataType::Int64, true))),
+ false,
+ ),
+ Field::new(
+ "struct_col",
+ DataType::Struct(Fields::from(vec![
+ Field::new("field1", DataType::Int64, true),
+ Field::new("field2", DataType::Utf8, true),
+ ])),
+ false,
+ ),
+ ])),
_ => plan_err!("No table named: {} found", name.table()),
};
@@ -4715,6 +4730,29 @@ fn roundtrip_crossjoin() -> Result<()> {
Ok(())
}
+#[test]
+fn test_unnest_logical_plan() -> Result<()> {
+ let query = "select unnest(struct_col), unnest(array_col), struct_col,
array_col from unnest_table";
+
+ let dialect = GenericDialect {};
+ let statement = Parser::new(&dialect)
+ .try_with_sql(query)?
+ .parse_statement()?;
+
+ let context = MockContextProvider::default();
+ let sql_to_rel = SqlToRel::new(&context);
+ let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap();
+
+ let expected = "Projection: unnest(unnest_table.struct_col).field1,
unnest(unnest_table.struct_col).field2, unnest(unnest_table.array_col),
unnest_table.struct_col, unnest_table.array_col\
+ \n Unnest: lists[unnest(unnest_table.array_col)]
structs[unnest(unnest_table.struct_col)]\
+ \n Projection: unnest_table.struct_col AS
unnest(unnest_table.struct_col), unnest_table.array_col AS
unnest(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col\
+ \n TableScan: unnest_table";
+
+ assert_eq!(format!("{plan:?}"), expected);
+
+ Ok(())
+}
+
#[cfg(test)]
#[ctor::ctor]
fn init() {
diff --git a/datafusion/sqllogictest/test_files/unnest.slt
b/datafusion/sqllogictest/test_files/unnest.slt
index ca7e73cb87..7b7249d6d5 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -22,12 +22,19 @@
statement ok
CREATE TABLE unnest_table
AS VALUES
- ([1,2,3], [7], 1, [13, 14]),
- ([4,5], [8,9,10], 2, [15, 16]),
- ([6], [11,12], 3, null),
- ([12], [null, 42, null], null, null),
+ ([1,2,3], [7], 1, [13, 14], struct(1,2)),
+ ([4,5], [8,9,10], 2, [15, 16], struct(3,4)),
+ ([6], [11,12], 3, null, null),
+ ([12], [null, 42, null], null, null, struct(7,8)),
-- null array to verify the `preserve_nulls` option
- (null, null, 4, [17, 18])
+ (null, null, 4, [17, 18], null)
+;
+
+statement ok
+CREATE TABLE nested_unnest_table
+AS VALUES
+ (struct('a', 'b', struct('c')), (struct('a', 'b', [10,20]))),
+ (struct('d', 'e', struct('f')), (struct('x', 'y', [30,40, 50])))
;
## Basic unnest expression in select list
@@ -38,7 +45,13 @@ select unnest([1,2,3]);
2
3
-## Basic unnest expression in from clause
+## Basic unnest expression in select struct
+query III
+select unnest(struct(1,2,3));
+----
+1 2 3
+
+## Basic unnest list expression in from clause
query I
select * from unnest([1,2,3]);
----
@@ -46,6 +59,20 @@ select * from unnest([1,2,3]);
2
3
+## Basic unnest struct expression in from clause
+query III
+select * from unnest(struct(1,2,3));
+----
+1 2 3
+
+## Multiple unnest expression in from clause
+query IIII
+select * from unnest(struct(1,2,3)),unnest([4,5,6]);
+----
+1 2 3 4
+1 2 3 5
+1 2 3 6
+
## Unnest null in select list
query error DataFusion error: This feature is not implemented: unnest\(\) does
not support null yet
@@ -145,10 +172,6 @@ select array_remove(column1, 4), unnest(column2), column3
* 10 from unnest_table
[12] NULL NULL
-## Unnest column with scalars
-query error DataFusion error: Error during planning: unnest\(\) can only be
applied to array, struct and null
-select unnest(column3) from unnest_table;
-
## Unnest doesn't work with untyped nulls
query error DataFusion error: This feature is not implemented: unnest\(\) does
not support null yet
select unnest(null) from unnest_table;
@@ -233,12 +256,16 @@ select * from unnest([], NULL::int[]);
## Unnest struct expression in select list
-query error DataFusion error: This feature is not implemented: unnest\(\) does
not support struct yet
+query ?
select unnest(struct(null));
+----
+NULL
## Unnest struct expression in from clause
-query error DataFusion error: This feature is not implemented: unnest\(\) does
not support struct yet
+query ?
select * from unnest(struct(null));
+----
+NULL
## Unnest array expression
@@ -288,6 +315,18 @@ select unnest(array_remove(column1, 12)) from unnest_table;
5
6
+## unnest struct-typed column and list-typed column at the same time
+query I?II?
+select unnest(column1), column1, unnest(column5), column5 from unnest_table;
+----
+1 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+2 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+3 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+4 [4, 5] 3 4 {c0: 3, c1: 4}
+5 [4, 5] 3 4 {c0: 3, c1: 4}
+6 [6] NULL NULL NULL
+12 [12] 7 8 {c0: 7, c1: 8}
+
## Unnest in from clause with alias
query I
@@ -383,8 +422,26 @@ select unnest(array_remove(column1, 3)) - 1 as c1, column3
from unnest_table;
5 3
11 NULL
+## unnest for nested struct(struct)
+query TT?
+select unnest(column1) from nested_unnest_table;
+----
+a b {c0: c}
+d e {c0: f}
+
+## unnest for nested(struct(list))
+query TT?
+select unnest(column2) from nested_unnest_table;
+----
+a b [10, 20]
+x y [30, 40, 50]
+
query error DataFusion error: type_coercion\ncaused by\nThis feature is not
implemented: Unnest should be rewritten to LogicalPlan::Unnest before type
coercion
select sum(unnest(generate_series(1,10)));
+## TODO: support unnest as a child expr
+query error DataFusion error: Internal error: unnest on struct can ony be
applied at the root level of select expression
+select arrow_typeof(unnest(column5)) from unnest_table;
+
statement ok
drop table unnest_table;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]