This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 34eda15b73 feat: support `unnest` multiple arrays (#10044)
34eda15b73 is described below
commit 34eda15b73a9e278af8844b30ed2f1c21c10359c
Author: Jonah Gao <[email protected]>
AuthorDate: Mon Apr 15 19:05:01 2024 +0800
feat: support `unnest` multiple arrays (#10044)
* Impl find_longest_length
* impl unnesting multi columns
* Change plans
* remove println
* fix tests
* simplify unnested fields
* update doc and tests
* more tests
* add test
* fix comment
* Add test for untyped null
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-cli/Cargo.lock | 1 +
datafusion/core/src/physical_planner.rs | 9 +-
datafusion/expr/src/expr.rs | 20 +-
datafusion/expr/src/expr_rewriter/mod.rs | 6 +-
datafusion/expr/src/expr_schema.rs | 8 +-
datafusion/expr/src/logical_plan/builder.rs | 77 ++-
datafusion/expr/src/logical_plan/display.rs | 4 +-
datafusion/expr/src/logical_plan/plan.rs | 56 +-
datafusion/expr/src/logical_plan/tree_node.rs | 27 +-
datafusion/expr/src/tree_node.rs | 2 +-
datafusion/physical-plan/Cargo.toml | 1 +
datafusion/physical-plan/src/unnest.rs | 651 ++++++++++++---------
datafusion/proto/src/logical_plan/from_proto.rs | 7 +-
datafusion/proto/src/logical_plan/to_proto.rs | 4 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 2 +-
datafusion/sql/src/expr/function.rs | 22 +-
datafusion/sql/src/relation/mod.rs | 21 +-
datafusion/sql/src/select.rs | 11 +-
datafusion/sqllogictest/test_files/unnest.slt | 90 ++-
19 files changed, 577 insertions(+), 442 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 9e192b0be0..7ddc0af430 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1373,6 +1373,7 @@ dependencies = [
"arrow",
"arrow-array",
"arrow-buffer",
+ "arrow-ord",
"arrow-schema",
"async-trait",
"chrono",
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index c25523c5ae..f5e937bb56 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1168,12 +1168,13 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
- LogicalPlan::Unnest(Unnest { input, column, schema, options })
=> {
+ LogicalPlan::Unnest(Unnest { input, columns, schema, options
}) => {
let input = self.create_initial_plan(input,
session_state).await?;
- let column_exec = schema.index_of_column(column)
- .map(|idx| Column::new(&column.name, idx))?;
+ let column_execs = columns.iter().map(|column| {
+ schema.index_of_column(column).map(|idx|
Column::new(&column.name, idx))
+ }).collect::<Result<_>>()?;
let schema =
SchemaRef::new(schema.as_ref().to_owned().into());
- Ok(Arc::new(UnnestExec::new(input, column_exec, schema,
options.clone())))
+ Ok(Arc::new(UnnestExec::new(input, column_execs, schema,
options.clone())))
}
LogicalPlan::Ddl(ddl) => {
// There is no default plan for DDl statements --
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index c7c50d8719..cffb58dadd 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -186,7 +186,16 @@ pub enum Expr {
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
- pub exprs: Vec<Expr>,
+ pub expr: Box<Expr>,
+}
+
+impl Unnest {
+ /// Create a new Unnest expression.
+ pub fn new(expr: Expr) -> Self {
+ Self {
+ expr: Box::new(expr),
+ }
+ }
}
/// Alias expression
@@ -1567,8 +1576,8 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
- Expr::Unnest(Unnest { exprs }) => {
- write!(f, "UNNEST({exprs:?})")
+ Expr::Unnest(Unnest { expr }) => {
+ write!(f, "UNNEST({expr:?})")
}
}
}
@@ -1757,7 +1766,10 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
- Expr::Unnest(Unnest { exprs }) => create_function_name("unnest",
false, exprs),
+ Expr::Unnest(Unnest { expr }) => {
+ let expr_name = create_name(expr)?;
+ Ok(format!("unnest({expr_name})"))
+ }
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false,
&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs
b/datafusion/expr/src/expr_rewriter/mod.rs
index d678fe7ee3..c11619fc0e 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -82,13 +82,13 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
- if let Expr::Unnest(Unnest { exprs }) = expr {
+ if let Expr::Unnest(Unnest { expr }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
- exprs[0].clone(),
+ expr.as_ref().clone(),
schemas,
using_columns,
)?;
- return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
+ return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
}
expr.transform(&|expr| {
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 39892d9e0c..466fd13ce2 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -115,12 +115,8 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) =>
Ok(data_type.clone()),
- Expr::Unnest(Unnest { exprs }) => {
- let arg_data_types = exprs
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- let arg_data_type = arg_data_types[0].clone();
+ Expr::Unnest(Unnest { expr }) => {
+ let arg_data_type = expr.get_type(schema)?;
// Unnest's output type is the inner type of the list
match arg_data_type{
DataType::List(field) | DataType::LargeList(field) |
DataType::FixedSizeList(field, _) =>{
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index beac5a7f4e..f7c0fbac53 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1112,7 +1112,7 @@ impl LogicalPlanBuilder {
/// Unnest the given column.
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
- Ok(Self::from(unnest(self.plan, column.into())?))
+ Ok(Self::from(unnest(self.plan, vec![column.into()])?))
}
/// Unnest the given column given [`UnnestOptions`]
@@ -1123,10 +1123,21 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
Ok(Self::from(unnest_with_options(
self.plan,
- column.into(),
+ vec![column.into()],
options,
)?))
}
+
+ /// Unnest the given columns with the given [`UnnestOptions`]
+ pub fn unnest_columns_with_options(
+ self,
+ columns: Vec<Column>,
+ options: UnnestOptions,
+ ) -> Result<Self> {
+ Ok(Self::from(unnest_with_options(
+ self.plan, columns, options,
+ )?))
+ }
}
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
let mut name_map = HashMap::new();
@@ -1534,44 +1545,50 @@ impl TableSource for LogicalTableSource {
}
/// Create a [`LogicalPlan::Unnest`] plan
-pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
- unnest_with_options(input, column, UnnestOptions::new())
+pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan>
{
+ unnest_with_options(input, columns, UnnestOptions::new())
}
/// Create a [`LogicalPlan::Unnest`] plan with options
pub fn unnest_with_options(
input: LogicalPlan,
- column: Column,
+ columns: Vec<Column>,
options: UnnestOptions,
) -> Result<LogicalPlan> {
- let (unnest_qualifier, unnest_field) =
- input.schema().qualified_field_from_column(&column)?;
-
// Extract the type of the nested field in the list.
- let unnested_field = match unnest_field.data_type() {
- DataType::List(field)
- | DataType::FixedSizeList(field, _)
- | DataType::LargeList(field) => Arc::new(Field::new(
- unnest_field.name(),
- field.data_type().clone(),
- unnest_field.is_nullable(),
- )),
- _ => {
- // If the unnest field is not a list type return the input plan.
- return Ok(input);
- }
- };
+ let mut unnested_fields: HashMap<usize, _> =
HashMap::with_capacity(columns.len());
+ // Add qualifiers to the columns.
+ let mut qualified_columns = Vec::with_capacity(columns.len());
+ for c in &columns {
+ let index = input.schema().index_of_column(c)?;
+ let (unnest_qualifier, unnest_field) =
input.schema().qualified_field(index);
+ let unnested_field = match unnest_field.data_type() {
+ DataType::List(field)
+ | DataType::FixedSizeList(field, _)
+ | DataType::LargeList(field) => Arc::new(Field::new(
+ unnest_field.name(),
+ field.data_type().clone(),
+ // Unnesting may produce NULLs even if the list is not null.
+ // For example: unnset([1], []) -> 1, null
+ true,
+ )),
+ _ => {
+ // If the unnest field is not a list type return the input
plan.
+ return Ok(input);
+ }
+ };
+ qualified_columns.push(Column::from((unnest_qualifier,
unnested_field.as_ref())));
+ unnested_fields.insert(index, unnested_field);
+ }
- // Update the schema with the unnest column type changed to contain the
nested type.
+ // Update the schema with the unnest column types changed to contain the
nested types.
let input_schema = input.schema();
let fields = input_schema
.iter()
- .map(|(q, f)| {
- if f.as_ref() == unnest_field && q == unnest_qualifier {
- (unnest_qualifier.cloned(), unnested_field.clone())
- } else {
- (q.cloned(), f.clone())
- }
+ .enumerate()
+ .map(|(index, (q, f))| match unnested_fields.get(&index) {
+ Some(unnested_field) => (q.cloned(), unnested_field.clone()),
+ None => (q.cloned(), f.clone()),
})
.collect::<Vec<_>>();
@@ -1580,11 +1597,9 @@ pub fn unnest_with_options(
// We can use the existing functional dependencies:
let deps = input_schema.functional_dependencies().clone();
let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
- let column = Column::from((unnest_qualifier, unnested_field.as_ref()));
-
Ok(LogicalPlan::Unnest(Unnest {
input: Arc::new(input),
- column,
+ columns: qualified_columns,
schema,
options,
}))
diff --git a/datafusion/expr/src/logical_plan/display.rs
b/datafusion/expr/src/logical_plan/display.rs
index edc3afd55d..3a2ed9ffc2 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -638,10 +638,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Node Type": "DescribeTable"
})
}
- LogicalPlan::Unnest(Unnest { column, .. }) => {
+ LogicalPlan::Unnest(Unnest { columns, .. }) => {
json!({
"Node Type": "Unnest",
- "Column": format!("{}", column)
+ "Column": expr_vec_fmt!(columns),
})
}
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 91c8670f38..dbff504601 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
-use crate::builder::change_redundant_column;
+use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Alias, Placeholder, Sort as SortExpr, WindowFunction};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
@@ -807,51 +807,11 @@ impl LogicalPlan {
}
LogicalPlan::DescribeTable(_) => Ok(self.clone()),
LogicalPlan::Unnest(Unnest {
- column,
- schema,
- options,
- ..
+ columns, options, ..
}) => {
// Update schema with unnested column type.
- let input = Arc::new(inputs.swap_remove(0));
- let (nested_qualifier, nested_field) =
- input.schema().qualified_field_from_column(column)?;
- let (unnested_qualifier, unnested_field) =
- schema.qualified_field_from_column(column)?;
- let qualifiers_and_fields = input
- .schema()
- .iter()
- .map(|(qualifier, field)| {
- if qualifier.eq(&nested_qualifier)
- && field.as_ref() == nested_field
- {
- (
- unnested_qualifier.cloned(),
- Arc::new(unnested_field.clone()),
- )
- } else {
- (qualifier.cloned(), field.clone())
- }
- })
- .collect::<Vec<_>>();
-
- let schema = Arc::new(
- DFSchema::new_with_metadata(
- qualifiers_and_fields,
- input.schema().metadata().clone(),
- )?
- // We can use the existing functional dependencies as is:
- .with_functional_dependencies(
- input.schema().functional_dependencies().clone(),
- )?,
- );
-
- Ok(LogicalPlan::Unnest(Unnest {
- input,
- column: column.clone(),
- schema,
- options: options.clone(),
- }))
+ let input = inputs.swap_remove(0);
+ unnest_with_options(input, columns.clone(), options.clone())
}
}
}
@@ -1581,8 +1541,8 @@ impl LogicalPlan {
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
- LogicalPlan::Unnest(Unnest { column, .. }) => {
- write!(f, "Unnest: {column}")
+ LogicalPlan::Unnest(Unnest { columns, .. }) => {
+ write!(f, "Unnest: {}", expr_vec_fmt!(columns))
}
}
}
@@ -2556,8 +2516,8 @@ pub enum Partitioning {
pub struct Unnest {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
- /// The column to unnest
- pub column: Column,
+ /// The columns to unnest
+ pub columns: Vec<Column>,
/// The output schema, containing the unnested field column.
pub schema: DFSchemaRef,
/// Options
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs
b/datafusion/expr/src/logical_plan/tree_node.rs
index 3644f89e8b..48f047c070 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -311,13 +311,13 @@ impl TreeNode for LogicalPlan {
}
LogicalPlan::Unnest(Unnest {
input,
- column,
+ columns,
schema,
options,
}) => rewrite_arc(input, f)?.update_data(|input| {
LogicalPlan::Unnest(Unnest {
input,
- column,
+ columns,
schema,
options,
})
@@ -507,8 +507,12 @@ impl LogicalPlan {
LogicalPlan::TableScan(TableScan { filters, .. }) => {
filters.iter().apply_until_stop(f)
}
- LogicalPlan::Unnest(Unnest { column, .. }) => {
- f(&Expr::Column(column.clone()))
+ LogicalPlan::Unnest(Unnest { columns, .. }) => {
+ let exprs = columns
+ .iter()
+ .map(|c| Expr::Column(c.clone()))
+ .collect::<Vec<_>>();
+ exprs.iter().apply_until_stop(f)
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
@@ -706,20 +710,6 @@ impl LogicalPlan {
fetch,
})
}),
- LogicalPlan::Unnest(Unnest {
- input,
- column,
- schema,
- options,
- }) => f(Expr::Column(column))?.map_data(|column| match column {
- Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
- input,
- column,
- schema,
- options,
- })),
- _ => internal_err!("Transformation should return Column"),
- })?,
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
select_expr,
@@ -744,6 +734,7 @@ impl LogicalPlan {
}),
// plans without expressions
LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs
index 85097f6249..35fec509c9 100644
--- a/datafusion/expr/src/tree_node.rs
+++ b/datafusion/expr/src/tree_node.rs
@@ -36,6 +36,7 @@ impl TreeNode for Expr {
) -> Result<TreeNodeRecursion> {
let children = match self {
Expr::Alias(Alias{expr,..})
+ | Expr::Unnest(Unnest{expr})
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
@@ -60,7 +61,6 @@ impl TreeNode for Expr {
GetFieldAccess::NamedStructField { .. } => vec![expr],
}
}
- Expr::Unnest(Unnest { exprs }) |
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) =>
exprs.iter().collect(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
diff --git a/datafusion/physical-plan/Cargo.toml
b/datafusion/physical-plan/Cargo.toml
index 6a78bd596a..6863f26460 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -39,6 +39,7 @@ ahash = { version = "0.8", default-features = false, features
= [
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
+arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
diff --git a/datafusion/physical-plan/src/unnest.rs
b/datafusion/physical-plan/src/unnest.rs
index 6ea1b3c40c..45b848112b 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines the unnest column plan for unnesting values in a column that
contains a list
-//! type, conceptually is like joining each row with all the values in the
list column.
+//! Define a plan for unnesting values in columns that contain a list type.
+
+use std::collections::HashMap;
use std::{any::Any, sync::Arc};
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
@@ -27,15 +28,17 @@ use crate::{
};
use arrow::array::{
- Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, GenericListArray,
- LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray,
-};
-use arrow::compute::kernels;
-use arrow::datatypes::{
- ArrowNativeType, DataType, Int32Type, Int64Type, Schema, SchemaRef,
+ Array, ArrayRef, AsArray, FixedSizeListArray, LargeListArray, ListArray,
+ PrimitiveArray,
};
+use arrow::compute::kernels::length::length;
+use arrow::compute::kernels::zip::zip;
+use arrow::compute::{cast, is_not_null, kernels, sum};
+use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use datafusion_common::{exec_err, Result, UnnestOptions};
+use arrow_array::{Int64Array, Scalar};
+use arrow_ord::cmp::lt;
+use datafusion_common::{exec_datafusion_err, exec_err, Result, UnnestOptions};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
@@ -43,7 +46,7 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use log::trace;
-/// Unnest the given column by joining the row with each value in the
+/// Unnest the given columns by joining the row with each value in the
/// nested type.
///
/// See [`UnnestOptions`] for more details and an example.
@@ -53,8 +56,8 @@ pub struct UnnestExec {
input: Arc<dyn ExecutionPlan>,
/// The schema once the unnest is applied
schema: SchemaRef,
- /// The unnest column
- column: Column,
+ /// The unnest columns
+ columns: Vec<Column>,
/// Options
options: UnnestOptions,
/// Execution metrics
@@ -67,7 +70,7 @@ impl UnnestExec {
/// Create a new [UnnestExec].
pub fn new(
input: Arc<dyn ExecutionPlan>,
- column: Column,
+ columns: Vec<Column>,
schema: SchemaRef,
options: UnnestOptions,
) -> Self {
@@ -75,7 +78,7 @@ impl UnnestExec {
UnnestExec {
input,
schema,
- column,
+ columns,
options,
metrics: Default::default(),
cache,
@@ -134,7 +137,7 @@ impl ExecutionPlan for UnnestExec {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(UnnestExec::new(
children[0].clone(),
- self.column.clone(),
+ self.columns.clone(),
self.schema.clone(),
self.options.clone(),
)))
@@ -155,7 +158,7 @@ impl ExecutionPlan for UnnestExec {
Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
- column: self.column.clone(),
+ columns: self.columns.clone(),
options: self.options.clone(),
metrics,
}))
@@ -210,8 +213,8 @@ struct UnnestStream {
input: SendableRecordBatchStream,
/// Unnested schema
schema: Arc<Schema>,
- /// The unnest column
- column: Column,
+ /// The unnest columns
+ columns: Vec<Column>,
/// Options
options: UnnestOptions,
/// Metrics
@@ -249,7 +252,7 @@ impl UnnestStream {
Some(Ok(batch)) => {
let timer = self.metrics.elapsed_compute.timer();
let result =
- build_batch(&batch, &self.schema, &self.column,
&self.options);
+ build_batch(&batch, &self.schema, &self.columns,
&self.options);
self.metrics.input_batches.add(1);
self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
@@ -276,270 +279,265 @@ impl UnnestStream {
}
}
+/// For each row in a `RecordBatch`, some list columns need to be unnested.
+/// We will expand the values in each list into multiple rows,
+/// taking the longest length among these lists, and shorter lists are padded
with NULLs.
+//
+/// For columns that don't need to be unnested, repeat their values until
reaching the longest length.
fn build_batch(
batch: &RecordBatch,
schema: &SchemaRef,
- column: &Column,
+ columns: &[Column],
options: &UnnestOptions,
) -> Result<RecordBatch> {
- let list_array = column.evaluate(batch)?.into_array(batch.num_rows())?;
- match list_array.data_type() {
- DataType::List(_) => {
- let list_array =
list_array.as_any().downcast_ref::<ListArray>().unwrap();
- build_batch_generic_list::<i32, Int32Type>(
- batch,
- schema,
- column.index(),
- list_array,
- options,
- )
- }
- DataType::LargeList(_) => {
- let list_array = list_array
- .as_any()
- .downcast_ref::<LargeListArray>()
- .unwrap();
- build_batch_generic_list::<i64, Int64Type>(
- batch,
- schema,
- column.index(),
- list_array,
- options,
- )
- }
- DataType::FixedSizeList(_, _) => {
- let list_array = list_array
- .as_any()
- .downcast_ref::<FixedSizeListArray>()
- .unwrap();
- build_batch_fixedsize_list(batch, schema, column.index(),
list_array, options)
- }
- _ => exec_err!("Invalid unnest column {column}"),
+ let list_arrays: Vec<ArrayRef> = columns
+ .iter()
+ .map(|column| column.evaluate(batch)?.into_array(batch.num_rows()))
+ .collect::<Result<_>>()?;
+
+ let longest_length = find_longest_length(&list_arrays, options)?;
+ let unnested_length = longest_length.as_primitive::<Int64Type>();
+ let total_length = if unnested_length.is_empty() {
+ 0
+ } else {
+ sum(unnested_length).ok_or_else(|| {
+ exec_datafusion_err!("Failed to calculate the total unnested
length")
+ })? as usize
+ };
+ if total_length == 0 {
+ return Ok(RecordBatch::new_empty(schema.clone()));
}
-}
-fn build_batch_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native =
T>>(
- batch: &RecordBatch,
- schema: &SchemaRef,
- unnest_column_idx: usize,
- list_array: &GenericListArray<T>,
- options: &UnnestOptions,
-) -> Result<RecordBatch> {
- let unnested_array = unnest_generic_list::<T, P>(list_array, options)?;
-
- let take_indicies =
- create_take_indicies_generic::<T, P>(list_array, unnested_array.len(),
options);
-
- batch_from_indices(
- batch,
- schema,
- unnest_column_idx,
- &unnested_array,
- &take_indicies,
- )
+ // Unnest all the list arrays
+ let unnested_arrays =
+ unnest_list_arrays(&list_arrays, unnested_length, total_length)?;
+ let unnested_array_map: HashMap<_, _> = unnested_arrays
+ .into_iter()
+ .zip(columns.iter())
+ .map(|(array, column)| (column.index(), array))
+ .collect();
+
+ // Create the take indices array for other columns
+ let take_indicies = create_take_indicies(unnested_length, total_length);
+
+ batch_from_indices(batch, schema, &unnested_array_map, &take_indicies)
}
-/// Given this `GenericList` list_array:
+/// Find the longest list length among the given list arrays for each row.
+///
+/// For example if we have the following two list arrays:
///
/// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// l1: [1, 2, 3], null, [], [3]
+/// l2: [4,5], [], null, [6, 7]
/// ```
-/// Its values array is represented like this:
+///
+/// If `preserve_nulls` is false, the longest length array will be:
///
/// ```ignore
-/// [1, 2, 3, 4, 5, 6]
+/// longest_length: [3, 0, 0, 2]
/// ```
///
-/// So if there are no null values or `UnnestOptions.preserve_nulls` is false
-/// we can return the values array without any copying.
+/// whereas if `preserve_nulls` is true, the longest length array will be:
///
-/// Otherwise we'll transfrom the values array using the take kernel and the
following take indicies:
///
/// ```ignore
-/// 0, null, 1, 2, 3, null, 4, 5
+/// longest_length: [3, 1, 1, 2]
/// ```
///
-fn unnest_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = T>>(
- list_array: &GenericListArray<T>,
+fn find_longest_length(
+ list_arrays: &[ArrayRef],
options: &UnnestOptions,
-) -> Result<Arc<dyn Array + 'static>> {
- let values = list_array.values();
- if list_array.null_count() == 0 {
- return Ok(values.clone());
+) -> Result<ArrayRef> {
+ // The length of a NULL list
+ let null_length = if options.preserve_nulls {
+ Scalar::new(Int64Array::from_value(1, 1))
+ } else {
+ Scalar::new(Int64Array::from_value(0, 1))
+ };
+ let list_lengths: Vec<ArrayRef> = list_arrays
+ .iter()
+ .map(|list_array| {
+ let mut length_array = length(list_array)?;
+ // Make sure length arrays have the same type. Int64 is the most
general one.
+ length_array = cast(&length_array, &DataType::Int64)?;
+ length_array =
+ zip(&is_not_null(&length_array)?, &length_array,
&null_length)?;
+ Ok(length_array)
+ })
+ .collect::<Result<_>>()?;
+
+ let longest_length = list_lengths.iter().skip(1).try_fold(
+ list_lengths[0].clone(),
+ |longest, current| {
+ let is_lt = lt(&longest, ¤t)?;
+ zip(&is_lt, ¤t, &longest)
+ },
+ )?;
+ Ok(longest_length)
+}
+
+/// Trait defining common methods used for unnesting, implemented by list
array types.
+trait ListArrayType: Array {
+ /// Returns a reference to the values of this list.
+ fn values(&self) -> &ArrayRef;
+
+ /// Returns the start and end offset of the values for the given row.
+ fn value_offsets(&self, row: usize) -> (i64, i64);
+}
+
+impl ListArrayType for ListArray {
+ fn values(&self) -> &ArrayRef {
+ self.values()
}
- let mut take_indicies_builder =
- PrimitiveArray::<P>::builder(values.len() + list_array.null_count());
- let offsets = list_array.value_offsets();
- for row in 0..list_array.len() {
- if list_array.is_null(row) {
- if options.preserve_nulls {
- take_indicies_builder.append_null();
- }
- } else {
- let start = offsets[row].as_usize();
- let end = offsets[row + 1].as_usize();
- for idx in start..end {
-
take_indicies_builder.append_value(P::Native::from_usize(idx).unwrap());
- }
- }
+ fn value_offsets(&self, row: usize) -> (i64, i64) {
+ let offsets = self.value_offsets();
+ (offsets[row].into(), offsets[row + 1].into())
}
- Ok(kernels::take::take(
- &values,
- &take_indicies_builder.finish(),
- None,
- )?)
}
-fn build_batch_fixedsize_list(
- batch: &RecordBatch,
- schema: &SchemaRef,
- unnest_column_idx: usize,
- list_array: &FixedSizeListArray,
- options: &UnnestOptions,
-) -> Result<RecordBatch> {
- let unnested_array = unnest_fixed_list(list_array, options)?;
-
- let take_indicies =
- create_take_indicies_fixed(list_array, unnested_array.len(), options);
-
- batch_from_indices(
- batch,
- schema,
- unnest_column_idx,
- &unnested_array,
- &take_indicies,
- )
+impl ListArrayType for LargeListArray {
+ fn values(&self) -> &ArrayRef {
+ self.values()
+ }
+
+ fn value_offsets(&self, row: usize) -> (i64, i64) {
+ let offsets = self.value_offsets();
+ (offsets[row], offsets[row + 1])
+ }
}
-/// Given this `FixedSizeListArray` list_array:
+impl ListArrayType for FixedSizeListArray {
+ fn values(&self) -> &ArrayRef {
+ self.values()
+ }
+
+ fn value_offsets(&self, row: usize) -> (i64, i64) {
+ let start = self.value_offset(row) as i64;
+ (start, start + self.value_length() as i64)
+ }
+}
+
+/// Unnest multiple list arrays according to the length array.
+fn unnest_list_arrays(
+ list_arrays: &[ArrayRef],
+ length_array: &PrimitiveArray<Int64Type>,
+ capacity: usize,
+) -> Result<Vec<ArrayRef>> {
+ let typed_arrays = list_arrays
+ .iter()
+ .map(|list_array| match list_array.data_type() {
+ DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn
ListArrayType),
+ DataType::LargeList(_) => {
+ Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
+ }
+ DataType::FixedSizeList(_, _) => {
+ Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
+ }
+ other => exec_err!("Invalid unnest datatype {other }"),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // If there is only one list column to unnest and it doesn't contain any
NULL lists,
+ // we can return the values array directly without any copying.
+ if typed_arrays.len() == 1 && typed_arrays[0].null_count() == 0 {
+ Ok(vec![typed_arrays[0].values().clone()])
+ } else {
+ typed_arrays
+ .iter()
+ .map(|list_array| unnest_list_array(*list_array, length_array,
capacity))
+ .collect::<Result<_>>()
+ }
+}
+
+/// Unnest a list array according the target length array.
///
-/// ```ignore
-/// [1, 2], null, [3, 4], null, [5, 6]
-/// ```
-/// Its values array is represented like this:
+/// Consider a list array like this:
///
/// ```ignore
-/// [1, 2, null, null 3, 4, null, null, 5, 6]
+/// [1], [2, 3, 4], null, [5], [],
/// ```
///
-/// So if there are no null values
-/// we can return the values array without any copying.
-///
-/// Otherwise we'll transfrom the values array using the take kernel.
-///
-/// If `UnnestOptions.preserve_nulls` is true the take indicies will look like
this:
+/// and the length array is:
///
/// ```ignore
-/// 0, 1, null, 4, 5, null, 8, 9
+/// [2, 3, 2, 1, 2]
/// ```
-/// Otherwise we drop the nulls and take indicies will look like this:
+///
+/// If the length of a certain list is less than the target length, pad with
NULLs.
+/// So the unnested array will look like this:
///
/// ```ignore
-/// 0, 1, 4, 5, 8, 9
+/// [1, null, 2, 3, 4, null, null, 5, null, null]
/// ```
///
-fn unnest_fixed_list(
- list_array: &FixedSizeListArray,
- options: &UnnestOptions,
-) -> Result<Arc<dyn Array + 'static>> {
+fn unnest_list_array(
+ list_array: &dyn ListArrayType,
+ length_array: &PrimitiveArray<Int64Type>,
+ capacity: usize,
+) -> Result<ArrayRef> {
let values = list_array.values();
-
- if list_array.null_count() == 0 {
- Ok(values.clone())
- } else {
- let len_without_nulls =
- values.len() - list_array.null_count() * list_array.value_length()
as usize;
- let null_count = if options.preserve_nulls {
- list_array.null_count()
- } else {
- 0
- };
- let mut builder =
- PrimitiveArray::<Int32Type>::builder(len_without_nulls +
null_count);
- let mut take_offset = 0;
- let fixed_value_length = list_array.value_length() as usize;
- list_array.iter().for_each(|elem| match elem {
- Some(_) => {
- for i in 0..fixed_value_length {
- //take_offset + i is always positive
- let take_index = take_offset + i;
- builder.append_value(take_index as i32);
- }
- take_offset += fixed_value_length;
- }
- None => {
- if options.preserve_nulls {
- builder.append_null();
- }
- take_offset += fixed_value_length;
+ let mut take_indicies_builder =
PrimitiveArray::<Int64Type>::builder(capacity);
+ for row in 0..list_array.len() {
+ let mut value_length = 0;
+ if !list_array.is_null(row) {
+ let (start, end) = list_array.value_offsets(row);
+ value_length = end - start;
+ for i in start..end {
+ take_indicies_builder.append_value(i)
}
- });
- Ok(kernels::take::take(&values, &builder.finish(), None)?)
+ }
+ let target_length = length_array.value(row);
+ debug_assert!(
+ value_length <= target_length,
+ "value length is beyond the longest length"
+ );
+ // Pad with NULL values
+ for _ in value_length..target_length {
+ take_indicies_builder.append_null();
+ }
}
+ Ok(kernels::take::take(
+ &values,
+ &take_indicies_builder.finish(),
+ None,
+ )?)
}
-/// Creates take indicies to be used to expand all other column's data.
-/// Every column value needs to be repeated as many times as many elements
there is in each corresponding array value.
+/// Creates take indicies that will be used to expand all columns except for
the unnest [`columns`](UnnestExec::columns).
+/// Every column value needs to be repeated multiple times according to the
length array.
///
-/// If the column being unnested looks like this:
+/// If the length array looks like this:
///
/// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// [2, 3, 1]
/// ```
-/// Then `create_take_indicies_generic` will return an array like this
+/// Then `create_take_indicies` will return an array like this
///
/// ```ignore
-/// [1, null, 2, 2, 2, null, 4, 4]
+/// [0, 0, 1, 1, 1, 2]
/// ```
///
-fn create_take_indicies_generic<T: OffsetSizeTrait, P:
ArrowPrimitiveType<Native = T>>(
- list_array: &GenericListArray<T>,
+fn create_take_indicies(
+ length_array: &PrimitiveArray<Int64Type>,
capacity: usize,
- options: &UnnestOptions,
-) -> PrimitiveArray<P> {
- let mut builder = PrimitiveArray::<P>::builder(capacity);
- let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
-
- for row in 0..list_array.len() {
- let repeat = if list_array.is_null(row) {
- null_repeat
- } else {
- list_array.value(row).len()
- };
-
- // `index` is a positive interger.
- let index = P::Native::from_usize(row).unwrap();
- (0..repeat).for_each(|_| builder.append_value(index));
+) -> PrimitiveArray<Int64Type> {
+ // `find_longest_length()` guarantees this.
+ debug_assert!(
+ length_array.null_count() == 0,
+ "length array should not contain nulls"
+ );
+ let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
+ for (index, repeat) in length_array.iter().enumerate() {
+ // The length array should not contain nulls, so unwrap is safe
+ let repeat = repeat.unwrap();
+ (0..repeat).for_each(|_| builder.append_value(index as i64));
}
-
builder.finish()
}
-fn create_take_indicies_fixed(
- list_array: &FixedSizeListArray,
- capacity: usize,
- options: &UnnestOptions,
-) -> PrimitiveArray<Int32Type> {
- let mut builder = PrimitiveArray::<Int32Type>::builder(capacity);
- let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
-
- for row in 0..list_array.len() {
- let repeat = if list_array.is_null(row) {
- null_repeat
- } else {
- list_array.value_length() as usize
- };
-
- // `index` is a positive interger.
- let index = <Int32Type as
ArrowPrimitiveType>::Native::from_usize(row).unwrap();
- (0..repeat).for_each(|_| builder.append_value(index));
- }
-
- builder.finish()
-}
-
-/// Create the final batch given the unnested column array and a `indices`
array
+/// Create the final batch given the unnested column arrays and a `indices`
array
/// that is used by the take kernel to copy values.
///
/// For example if we have the following `RecordBatch`:
@@ -549,8 +547,8 @@ fn create_take_indicies_fixed(
/// c2: 'a', 'b', 'c', null, 'd'
/// ```
///
-/// then the `unnested_array` contains the unnest column that will replace
`c1` in
-/// the final batch:
+/// then the `unnested_list_arrays` contains the unnest column that will
replace `c1` in
+/// the final batch if `preserve_nulls` is true:
///
/// ```ignore
/// c1: 1, null, 2, 3, 4, null, 5, 6
@@ -570,26 +568,19 @@ fn create_take_indicies_fixed(
/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
/// ```
///
-fn batch_from_indices<T>(
+fn batch_from_indices(
batch: &RecordBatch,
schema: &SchemaRef,
- unnest_column_idx: usize,
- unnested_array: &ArrayRef,
- indices: &PrimitiveArray<T>,
-) -> Result<RecordBatch>
-where
- T: ArrowPrimitiveType,
-{
+ unnested_list_arrays: &HashMap<usize, ArrayRef>,
+ indices: &PrimitiveArray<Int64Type>,
+) -> Result<RecordBatch> {
let arrays = batch
.columns()
.iter()
.enumerate()
- .map(|(col_idx, arr)| {
- if col_idx == unnest_column_idx {
- Ok(unnested_array.clone())
- } else {
- Ok(kernels::take::take(&arr, indices, None)?)
- }
+ .map(|(col_idx, arr)| match unnested_list_arrays.get(&col_idx) {
+ Some(unnested_array) => Ok(unnested_array.clone()),
+ None => Ok(kernels::take::take(arr, indices, None)?),
})
.collect::<Result<Vec<_>>>()?;
@@ -599,51 +590,51 @@ where
#[cfg(test)]
mod tests {
use super::*;
- use arrow::{
- array::AsArray,
- datatypes::{DataType, Field},
- };
- use arrow_array::StringArray;
+ use arrow::datatypes::{DataType, Field};
+ use arrow_array::{GenericListArray, OffsetSizeTrait, StringArray};
use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer};
- // Create a ListArray with the following list values:
+ // Create a GenericListArray with the following list values:
// [A, B, C], [], NULL, [D], NULL, [NULL, F]
- fn make_test_array() -> ListArray {
+ fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
+ where
+ OffsetSize: OffsetSizeTrait,
+ {
let mut values = vec![];
- let mut offsets = vec![0];
- let mut valid = BooleanBufferBuilder::new(2);
+ let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
+ let mut valid = BooleanBufferBuilder::new(6);
// [A, B, C]
values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
- offsets.push(values.len() as i32);
+ offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append(true);
// []
- offsets.push(values.len() as i32);
+ offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append(true);
// NULL with non-zero value length
// Issue https://github.com/apache/arrow-datafusion/issues/9932
values.push(Some("?"));
- offsets.push(values.len() as i32);
+ offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append(false);
// [D]
values.push(Some("D"));
- offsets.push(values.len() as i32);
+ offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append(true);
// Another NULL with zero value length
- offsets.push(values.len() as i32);
+ offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append(false);
// [NULL, F]
values.extend_from_slice(&[None, Some("F")]);
- offsets.push(values.len() as i32);
+ offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append(true);
let field = Arc::new(Field::new("item", DataType::Utf8, true));
- ListArray::new(
+ GenericListArray::<OffsetSize>::new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(StringArray::from(values)),
@@ -651,43 +642,141 @@ mod tests {
)
}
- #[test]
- fn test_unnest_generic_list() -> datafusion_common::Result<()> {
- let list_array = make_test_array();
-
- // Test with preserve_nulls = false
- let options = UnnestOptions {
- preserve_nulls: false,
- };
- let unnested_array =
- unnest_generic_list::<i32, Int32Type>(&list_array, &options)?;
- let strs =
unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
- assert_eq!(
- strs,
- vec![Some("A"), Some("B"), Some("C"), Some("D"), None, Some("F")]
- );
+ // Create a FixedSizeListArray with the following list values:
+ // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+ fn make_fixed_list() -> FixedSizeListArray {
+ let values = Arc::new(StringArray::from_iter([
+ Some("A"),
+ Some("B"),
+ None,
+ None,
+ Some("C"),
+ Some("D"),
+ None,
+ None,
+ None,
+ Some("F"),
+ None,
+ None,
+ ]));
+ let field = Arc::new(Field::new("item", DataType::Utf8, true));
+ let valid = NullBuffer::from(vec![true, false, true, false, true,
true]);
+ FixedSizeListArray::new(field, 2, values, Some(valid))
+ }
- // Test with preserve_nulls = true
- let options = UnnestOptions {
- preserve_nulls: true,
- };
- let unnested_array =
- unnest_generic_list::<i32, Int32Type>(&list_array, &options)?;
+ fn verify_unnest_list_array(
+ list_array: &dyn ListArrayType,
+ lengths: Vec<i64>,
+ expected: Vec<Option<&str>>,
+ ) -> datafusion_common::Result<()> {
+ let length_array = Int64Array::from(lengths);
+ let unnested_array = unnest_list_array(list_array, &length_array, 3 *
6)?;
let strs =
unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
- assert_eq!(
- strs,
+ assert_eq!(strs, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn test_unnest_list_array() -> datafusion_common::Result<()> {
+ // [A, B, C], [], NULL, [D], NULL, [NULL, F]
+ let list_array = make_generic_array::<i32>();
+ verify_unnest_list_array(
+ &list_array,
+ vec![3, 2, 1, 2, 0, 3],
vec![
Some("A"),
Some("B"),
Some("C"),
None,
+ None,
+ None,
Some("D"),
None,
None,
- Some("F")
- ]
+ Some("F"),
+ None,
+ ],
+ )?;
+
+ // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+ let list_array = make_fixed_list();
+ verify_unnest_list_array(
+ &list_array,
+ vec![3, 1, 2, 0, 2, 3],
+ vec![
+ Some("A"),
+ Some("B"),
+ None,
+ None,
+ Some("C"),
+ Some("D"),
+ None,
+ Some("F"),
+ None,
+ None,
+ None,
+ ],
+ )?;
+
+ Ok(())
+ }
+
+ fn verify_longest_length(
+ list_arrays: &[ArrayRef],
+ preserve_nulls: bool,
+ expected: Vec<i64>,
+ ) -> datafusion_common::Result<()> {
+ let options = UnnestOptions { preserve_nulls };
+ let longest_length = find_longest_length(list_arrays, &options)?;
+ let expected_array = Int64Array::from(expected);
+ assert_eq!(
+ longest_length
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap(),
+ &expected_array
);
+ Ok(())
+ }
+
+ #[test]
+ fn test_longest_list_length() -> datafusion_common::Result<()> {
+ // Test with single ListArray
+ // [A, B, C], [], NULL, [D], NULL, [NULL, F]
+ let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
+ verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1,
0, 2])?;
+ verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1,
2])?;
+
+ // Test with single LargeListArray
+ // [A, B, C], [], NULL, [D], NULL, [NULL, F]
+ let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
+ verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1,
0, 2])?;
+ verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1,
2])?;
+
+ // Test with single FixedSizeListArray
+ // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+ let list_array = Arc::new(make_fixed_list()) as ArrayRef;
+ verify_longest_length(&[list_array.clone()], false, vec![2, 0, 2, 0,
2, 2])?;
+ verify_longest_length(&[list_array.clone()], true, vec![2, 1, 2, 1, 2,
2])?;
+
+ // Test with multiple list arrays
+ // [A, B, C], [], NULL, [D], NULL, [NULL, F]
+ // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+ let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
+ let list2 = Arc::new(make_fixed_list()) as ArrayRef;
+ let list_arrays = vec![list1.clone(), list2.clone()];
+ verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
+ verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
Ok(())
}
+
+ #[test]
+ fn test_create_take_indicies() -> datafusion_common::Result<()> {
+ let length_array = Int64Array::from(vec![2, 3, 1]);
+ let take_indicies = create_take_indicies(&length_array, 6);
+ let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
+ assert_eq!(take_indicies, expected);
+ Ok(())
+ }
}
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 4e61385bb5..e66bd1a5f0 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -1257,8 +1257,11 @@ pub fn parse_expr(
parse_required_expr(negative.expr.as_deref(), registry, "expr",
codec)?,
))),
ExprType::Unnest(unnest) => {
- let exprs = parse_exprs(&unnest.exprs, registry, codec)?;
- Ok(Expr::Unnest(Unnest { exprs }))
+ let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
+ if exprs.len() != 1 {
+ return Err(proto_error("Unnest must have exactly one
expression"));
+ }
+ Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
}
ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
Box::new(parse_required_expr(
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 2680bc15e1..4916b4bed9 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -963,9 +963,9 @@ pub fn serialize_expr(
expr_type: Some(ExprType::Negative(expr)),
}
}
- Expr::Unnest(Unnest { exprs }) => {
+ Expr::Unnest(Unnest { expr }) => {
let expr = protobuf::Unnest {
- exprs: serialize_exprs(exprs, codec)?,
+ exprs: vec![serialize_expr(expr.as_ref(), codec)?],
};
protobuf::LogicalExprNode {
expr_type: Some(ExprType::Unnest(expr)),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index e680a1b2ff..eee15008fb 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1599,7 +1599,7 @@ fn roundtrip_inlist() {
#[test]
fn roundtrip_unnest() {
let test_expr = Expr::Unnest(Unnest {
- exprs: vec![lit(1), lit(2), lit(3)],
+ expr: Box::new(col("col")),
});
let ctx = SessionContext::new();
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index 501c51c4be..c225afec58 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -119,10 +119,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Build Unnest expression
if name.eq("unnest") {
- let exprs =
+ let mut exprs =
self.function_args_to_expr(args.clone(), schema,
planner_context)?;
- Self::check_unnest_args(&exprs, schema)?;
- return Ok(Expr::Unnest(Unnest { exprs }));
+ if exprs.len() != 1 {
+ return plan_err!("unnest() requires exactly one argument");
+ }
+ let expr = exprs.swap_remove(0);
+ Self::check_unnest_arg(&expr, schema)?;
+ return Ok(Expr::Unnest(Unnest::new(expr)));
}
// next, scalar built-in
@@ -342,17 +346,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<Vec<Expr>>>()
}
- pub(crate) fn check_unnest_args(args: &[Expr], schema: &DFSchema) ->
Result<()> {
- // Currently only one argument is supported
- let arg = match args.len() {
- 0 => {
- return plan_err!("unnest() requires at least one argument");
- }
- 1 => &args[0],
- _ => {
- return not_impl_err!("unnest() does not support multiple
arguments yet");
- }
- };
+ pub(crate) fn check_unnest_arg(arg: &Expr, schema: &DFSchema) ->
Result<()> {
// Check argument type, array types are supported
match arg.get_type(schema)? {
DataType::List(_)
diff --git a/datafusion/sql/src/relation/mod.rs
b/datafusion/sql/src/relation/mod.rs
index 1e01205ba6..9380e569f2 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -105,15 +105,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Unnest table factor has empty input
let schema = DFSchema::empty();
let input = LogicalPlanBuilder::empty(true).build()?;
- let exprs = array_exprs
+ // Unnest table factor can have multiple arugments.
+ // We treat each argument as a separate unnest expression.
+ let unnest_exprs = array_exprs
.into_iter()
- .map(|expr| {
- self.sql_expr_to_logical_expr(expr, &schema,
planner_context)
+ .map(|sql_expr| {
+ let expr = self.sql_expr_to_logical_expr(
+ sql_expr,
+ &schema,
+ planner_context,
+ )?;
+ Self::check_unnest_arg(&expr, &schema)?;
+ Ok(Expr::Unnest(Unnest::new(expr)))
})
.collect::<Result<Vec<_>>>()?;
- Self::check_unnest_args(&exprs, &schema)?;
- let unnest_expr = Expr::Unnest(Unnest { exprs });
- let logical_plan = self.try_process_unnest(input,
vec![unnest_expr])?;
+ if unnest_exprs.is_empty() {
+ return plan_err!("UNNEST must have at least one argument");
+ }
+ let logical_plan = self.try_process_unnest(input,
unnest_exprs)?;
(logical_plan, alias)
}
TableFactor::UNNEST { .. } => {
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 1bfd60a8ce..30eacdb44c 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -294,13 +294,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
transformed,
tnr: _,
} = expr.transform_up_mut(&mut |expr: Expr| {
- if let Expr::Unnest(Unnest { ref exprs }) = expr {
+ if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
let column_name = expr.display_name()?;
unnest_columns.push(column_name.clone());
// Add alias for the argument expression, to avoid
naming conflicts with other expressions
// in the select list. For example: `select
unnest(col1), col1 from t`.
inner_projection_exprs
- .push(exprs[0].clone().alias(column_name.clone()));
+ .push(arg.clone().alias(column_name.clone()));
Ok(Transformed::yes(Expr::Column(Column::from_name(
column_name,
))))
@@ -332,15 +332,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.project(inner_projection_exprs)?
.build()
} else {
- if unnest_columns.len() > 1 {
- return not_impl_err!("Only support single unnest expression
for now");
- }
- let unnest_column = unnest_columns.pop().unwrap();
+ let columns = unnest_columns.into_iter().map(|col|
col.into()).collect();
// Set preserve_nulls to false to ensure compatibility with DuckDB
and PostgreSQL
let unnest_options =
UnnestOptions::new().with_preserve_nulls(false);
LogicalPlanBuilder::from(input)
.project(inner_projection_exprs)?
- .unnest_column_with_options(unnest_column, unnest_options)?
+ .unnest_columns_with_options(columns, unnest_options)?
.project(outer_projection_exprs)?
.build()
}
diff --git a/datafusion/sqllogictest/test_files/unnest.slt
b/datafusion/sqllogictest/test_files/unnest.slt
index 5c178bb392..38207fa7d1 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -22,12 +22,12 @@
statement ok
CREATE TABLE unnest_table
AS VALUES
- ([1,2,3], [7], 1),
- ([4,5], [8,9,10], 2),
- ([6], [11,12], 3),
- ([12], [null, 42, null], null),
+ ([1,2,3], [7], 1, [13, 14]),
+ ([4,5], [8,9,10], 2, [15, 16]),
+ ([6], [11,12], 3, null),
+ ([12], [null, 42, null], null, null),
-- null array to verify the `preserve_nulls` option
- (null, null, 4)
+ (null, null, 4, [17, 18])
;
## Basic unnest expression in select list
@@ -93,6 +93,20 @@ NULL
42
NULL
+## Unnest single column and filter out null lists
+query I
+select unnest(column2) from unnest_table where column2 is not null;
+----
+7
+8
+9
+10
+11
+12
+NULL
+42
+NULL
+
## Unnest with additional column
## Issue: https://github.com/apache/arrow-datafusion/issues/9349
query II
@@ -135,9 +149,48 @@ select array_remove(column1, 4), unnest(column2), column3
* 10 from unnest_table
query error DataFusion error: Error during planning: unnest\(\) can only be
applied to array, struct and null
select unnest(column3) from unnest_table;
+## Unnest doesn't work with untyped nulls
+query error DataFusion error: This feature is not implemented: unnest\(\) does
not support null yet
+select unnest(null) from unnest_table;
+
## Multiple unnest functions in selection
-query error DataFusion error: This feature is not implemented: Only support
single unnest expression for now
-select unnest(column1), unnest(column2) from unnest_table;
+query ?I
+select unnest([]), unnest(NULL::int[]);
+----
+
+query III
+select
+ unnest(column1),
+ unnest(arrow_cast(column2, 'LargeList(Int64)')),
+ unnest(arrow_cast(column4, 'FixedSizeList(2, Int64)'))
+from unnest_table where column4 is not null;
+----
+1 7 13
+2 NULL 14
+3 NULL NULL
+4 8 15
+5 9 16
+NULL 10 NULL
+NULL NULL 17
+NULL NULL 18
+
+query IIII
+select
+ unnest(column1), unnest(column2) + 2,
+ column3 * 10, unnest(array_remove(column1, '4'))
+from unnest_table;
+----
+1 9 10 1
+2 NULL 10 2
+3 NULL 10 3
+4 10 20 5
+5 11 20 NULL
+NULL 12 20 NULL
+6 13 30 6
+NULL 14 30 NULL
+12 NULL NULL 12
+NULL 44 NULL NULL
+NULL NULL NULL NULL
## Unnest scalar in select list
query error DataFusion error: Error during planning: unnest\(\) can only be
applied to array, struct and null
@@ -149,7 +202,7 @@ select * from unnest(1);
## Unnest empty expression in select list
-query error DataFusion error: Error during planning: unnest\(\) requires at
least one argument
+query error DataFusion error: Error during planning: unnest\(\) requires
exactly one argument
select unnest();
## Unnest empty expression in from clause
@@ -157,13 +210,26 @@ query error DataFusion error: SQL error:
ParserError\("Expected an expression:,
select * from unnest();
-## Unnest multiple expressions in select list
-query error DataFusion error: This feature is not implemented: unnest\(\) does
not support multiple arguments yet
+## Unnest multiple expressions in select list. This form is only allowed in a
query's FROM clause.
+query error DataFusion error: Error during planning: unnest\(\) requires
exactly one argument
select unnest([1,2], [2,3]);
## Unnest multiple expressions in from clause
-query error DataFusion error: This feature is not implemented: unnest\(\) does
not support multiple arguments yet
-select * from unnest([1,2], [2,3]);
+query ITII
+select * from unnest(
+ [1,2],
+ arrow_cast(['a','b', 'c'], 'LargeList(Utf8)'),
+ arrow_cast([4, NULL], 'FixedSizeList(2, Int64)'),
+ NULL::int[]
+) as t(a, b, c, d);
+----
+1 a 4 NULL
+2 b NULL NULL
+NULL c NULL NULL
+
+query ?I
+select * from unnest([], NULL::int[]);
+----
## Unnest struct expression in select list