This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 34eda15b73 feat: support `unnest` multiple arrays (#10044)
34eda15b73 is described below

commit 34eda15b73a9e278af8844b30ed2f1c21c10359c
Author: Jonah Gao <[email protected]>
AuthorDate: Mon Apr 15 19:05:01 2024 +0800

    feat: support `unnest` multiple arrays (#10044)
    
    * Impl find_longest_length
    
    * impl unnesting multi columns
    
    * Change plans
    
    * remove println
    
    * fix tests
    
    * simplify unnested fields
    
    * update doc and tests
    
    * more tests
    
    * add test
    
    * fix comment
    
    * Add test for untyped null
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-cli/Cargo.lock                          |   1 +
 datafusion/core/src/physical_planner.rs            |   9 +-
 datafusion/expr/src/expr.rs                        |  20 +-
 datafusion/expr/src/expr_rewriter/mod.rs           |   6 +-
 datafusion/expr/src/expr_schema.rs                 |   8 +-
 datafusion/expr/src/logical_plan/builder.rs        |  77 ++-
 datafusion/expr/src/logical_plan/display.rs        |   4 +-
 datafusion/expr/src/logical_plan/plan.rs           |  56 +-
 datafusion/expr/src/logical_plan/tree_node.rs      |  27 +-
 datafusion/expr/src/tree_node.rs                   |   2 +-
 datafusion/physical-plan/Cargo.toml                |   1 +
 datafusion/physical-plan/src/unnest.rs             | 651 ++++++++++++---------
 datafusion/proto/src/logical_plan/from_proto.rs    |   7 +-
 datafusion/proto/src/logical_plan/to_proto.rs      |   4 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   2 +-
 datafusion/sql/src/expr/function.rs                |  22 +-
 datafusion/sql/src/relation/mod.rs                 |  21 +-
 datafusion/sql/src/select.rs                       |  11 +-
 datafusion/sqllogictest/test_files/unnest.slt      |  90 ++-
 19 files changed, 577 insertions(+), 442 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 9e192b0be0..7ddc0af430 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1373,6 +1373,7 @@ dependencies = [
  "arrow",
  "arrow-array",
  "arrow-buffer",
+ "arrow-ord",
  "arrow-schema",
  "async-trait",
  "chrono",
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index c25523c5ae..f5e937bb56 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1168,12 +1168,13 @@ impl DefaultPhysicalPlanner {
 
                     Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
                 }
-                LogicalPlan::Unnest(Unnest { input, column, schema, options }) 
=> {
+                LogicalPlan::Unnest(Unnest { input, columns, schema, options 
}) => {
                     let input = self.create_initial_plan(input, 
session_state).await?;
-                    let column_exec = schema.index_of_column(column)
-                        .map(|idx| Column::new(&column.name, idx))?;
+                    let column_execs = columns.iter().map(|column| {
+                        schema.index_of_column(column).map(|idx| 
Column::new(&column.name, idx))
+                    }).collect::<Result<_>>()?;
                     let schema = 
SchemaRef::new(schema.as_ref().to_owned().into());
-                    Ok(Arc::new(UnnestExec::new(input, column_exec, schema, 
options.clone())))
+                    Ok(Arc::new(UnnestExec::new(input, column_execs, schema, 
options.clone())))
                 }
                 LogicalPlan::Ddl(ddl) => {
                     // There is no default plan for DDl statements --
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index c7c50d8719..cffb58dadd 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -186,7 +186,16 @@ pub enum Expr {
 
 #[derive(Clone, PartialEq, Eq, Hash, Debug)]
 pub struct Unnest {
-    pub exprs: Vec<Expr>,
+    pub expr: Box<Expr>,
+}
+
+impl Unnest {
+    /// Create a new Unnest expression.
+    pub fn new(expr: Expr) -> Self {
+        Self {
+            expr: Box::new(expr),
+        }
+    }
 }
 
 /// Alias expression
@@ -1567,8 +1576,8 @@ impl fmt::Display for Expr {
                 }
             },
             Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
-            Expr::Unnest(Unnest { exprs }) => {
-                write!(f, "UNNEST({exprs:?})")
+            Expr::Unnest(Unnest { expr }) => {
+                write!(f, "UNNEST({expr:?})")
             }
         }
     }
@@ -1757,7 +1766,10 @@ fn create_name(e: &Expr) -> Result<String> {
                 }
             }
         }
-        Expr::Unnest(Unnest { exprs }) => create_function_name("unnest", 
false, exprs),
+        Expr::Unnest(Unnest { expr }) => {
+            let expr_name = create_name(expr)?;
+            Ok(format!("unnest({expr_name})"))
+        }
         Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, 
&fun.args),
         Expr::WindowFunction(WindowFunction {
             fun,
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs 
b/datafusion/expr/src/expr_rewriter/mod.rs
index d678fe7ee3..c11619fc0e 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -82,13 +82,13 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
     using_columns: &[HashSet<Column>],
 ) -> Result<Expr> {
     // Normalize column inside Unnest
-    if let Expr::Unnest(Unnest { exprs }) = expr {
+    if let Expr::Unnest(Unnest { expr }) = expr {
         let e = normalize_col_with_schemas_and_ambiguity_check(
-            exprs[0].clone(),
+            expr.as_ref().clone(),
             schemas,
             using_columns,
         )?;
-        return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
+        return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
     }
 
     expr.transform(&|expr| {
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 39892d9e0c..466fd13ce2 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -115,12 +115,8 @@ impl ExprSchemable for Expr {
             Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
             Expr::Cast(Cast { data_type, .. })
             | Expr::TryCast(TryCast { data_type, .. }) => 
Ok(data_type.clone()),
-            Expr::Unnest(Unnest { exprs }) => {
-                let arg_data_types = exprs
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                let arg_data_type = arg_data_types[0].clone();
+            Expr::Unnest(Unnest { expr }) => {
+                let arg_data_type = expr.get_type(schema)?;
                 // Unnest's output type is the inner type of the list
                 match arg_data_type{
                     DataType::List(field) | DataType::LargeList(field) | 
DataType::FixedSizeList(field, _) =>{
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index beac5a7f4e..f7c0fbac53 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1112,7 +1112,7 @@ impl LogicalPlanBuilder {
 
     /// Unnest the given column.
     pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
-        Ok(Self::from(unnest(self.plan, column.into())?))
+        Ok(Self::from(unnest(self.plan, vec![column.into()])?))
     }
 
     /// Unnest the given column given [`UnnestOptions`]
@@ -1123,10 +1123,21 @@ impl LogicalPlanBuilder {
     ) -> Result<Self> {
         Ok(Self::from(unnest_with_options(
             self.plan,
-            column.into(),
+            vec![column.into()],
             options,
         )?))
     }
+
+    /// Unnest the given columns with the given [`UnnestOptions`]
+    pub fn unnest_columns_with_options(
+        self,
+        columns: Vec<Column>,
+        options: UnnestOptions,
+    ) -> Result<Self> {
+        Ok(Self::from(unnest_with_options(
+            self.plan, columns, options,
+        )?))
+    }
 }
 pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
     let mut name_map = HashMap::new();
@@ -1534,44 +1545,50 @@ impl TableSource for LogicalTableSource {
 }
 
 /// Create a [`LogicalPlan::Unnest`] plan
-pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
-    unnest_with_options(input, column, UnnestOptions::new())
+pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> 
{
+    unnest_with_options(input, columns, UnnestOptions::new())
 }
 
 /// Create a [`LogicalPlan::Unnest`] plan with options
 pub fn unnest_with_options(
     input: LogicalPlan,
-    column: Column,
+    columns: Vec<Column>,
     options: UnnestOptions,
 ) -> Result<LogicalPlan> {
-    let (unnest_qualifier, unnest_field) =
-        input.schema().qualified_field_from_column(&column)?;
-
     // Extract the type of the nested field in the list.
-    let unnested_field = match unnest_field.data_type() {
-        DataType::List(field)
-        | DataType::FixedSizeList(field, _)
-        | DataType::LargeList(field) => Arc::new(Field::new(
-            unnest_field.name(),
-            field.data_type().clone(),
-            unnest_field.is_nullable(),
-        )),
-        _ => {
-            // If the unnest field is not a list type return the input plan.
-            return Ok(input);
-        }
-    };
+    let mut unnested_fields: HashMap<usize, _> = 
HashMap::with_capacity(columns.len());
+    // Add qualifiers to the columns.
+    let mut qualified_columns = Vec::with_capacity(columns.len());
+    for c in &columns {
+        let index = input.schema().index_of_column(c)?;
+        let (unnest_qualifier, unnest_field) = 
input.schema().qualified_field(index);
+        let unnested_field = match unnest_field.data_type() {
+            DataType::List(field)
+            | DataType::FixedSizeList(field, _)
+            | DataType::LargeList(field) => Arc::new(Field::new(
+                unnest_field.name(),
+                field.data_type().clone(),
+                // Unnesting may produce NULLs even if the list is not null.
+                // For example: unnset([1], []) -> 1, null
+                true,
+            )),
+            _ => {
+                // If the unnest field is not a list type return the input 
plan.
+                return Ok(input);
+            }
+        };
+        qualified_columns.push(Column::from((unnest_qualifier, 
unnested_field.as_ref())));
+        unnested_fields.insert(index, unnested_field);
+    }
 
-    // Update the schema with the unnest column type changed to contain the 
nested type.
+    // Update the schema with the unnest column types changed to contain the 
nested types.
     let input_schema = input.schema();
     let fields = input_schema
         .iter()
-        .map(|(q, f)| {
-            if f.as_ref() == unnest_field && q == unnest_qualifier {
-                (unnest_qualifier.cloned(), unnested_field.clone())
-            } else {
-                (q.cloned(), f.clone())
-            }
+        .enumerate()
+        .map(|(index, (q, f))| match unnested_fields.get(&index) {
+            Some(unnested_field) => (q.cloned(), unnested_field.clone()),
+            None => (q.cloned(), f.clone()),
         })
         .collect::<Vec<_>>();
 
@@ -1580,11 +1597,9 @@ pub fn unnest_with_options(
     // We can use the existing functional dependencies:
     let deps = input_schema.functional_dependencies().clone();
     let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
-    let column = Column::from((unnest_qualifier, unnested_field.as_ref()));
-
     Ok(LogicalPlan::Unnest(Unnest {
         input: Arc::new(input),
-        column,
+        columns: qualified_columns,
         schema,
         options,
     }))
diff --git a/datafusion/expr/src/logical_plan/display.rs 
b/datafusion/expr/src/logical_plan/display.rs
index edc3afd55d..3a2ed9ffc2 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -638,10 +638,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
                     "Node Type": "DescribeTable"
                 })
             }
-            LogicalPlan::Unnest(Unnest { column, .. }) => {
+            LogicalPlan::Unnest(Unnest { columns, .. }) => {
                 json!({
                     "Node Type": "Unnest",
-                    "Column": format!("{}", column)
+                    "Column": expr_vec_fmt!(columns),
                 })
             }
         }
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 91c8670f38..dbff504601 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
 
 use super::dml::CopyTo;
 use super::DdlStatement;
-use crate::builder::change_redundant_column;
+use crate::builder::{change_redundant_column, unnest_with_options};
 use crate::expr::{Alias, Placeholder, Sort as SortExpr, WindowFunction};
 use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
 use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
@@ -807,51 +807,11 @@ impl LogicalPlan {
             }
             LogicalPlan::DescribeTable(_) => Ok(self.clone()),
             LogicalPlan::Unnest(Unnest {
-                column,
-                schema,
-                options,
-                ..
+                columns, options, ..
             }) => {
                 // Update schema with unnested column type.
-                let input = Arc::new(inputs.swap_remove(0));
-                let (nested_qualifier, nested_field) =
-                    input.schema().qualified_field_from_column(column)?;
-                let (unnested_qualifier, unnested_field) =
-                    schema.qualified_field_from_column(column)?;
-                let qualifiers_and_fields = input
-                    .schema()
-                    .iter()
-                    .map(|(qualifier, field)| {
-                        if qualifier.eq(&nested_qualifier)
-                            && field.as_ref() == nested_field
-                        {
-                            (
-                                unnested_qualifier.cloned(),
-                                Arc::new(unnested_field.clone()),
-                            )
-                        } else {
-                            (qualifier.cloned(), field.clone())
-                        }
-                    })
-                    .collect::<Vec<_>>();
-
-                let schema = Arc::new(
-                    DFSchema::new_with_metadata(
-                        qualifiers_and_fields,
-                        input.schema().metadata().clone(),
-                    )?
-                    // We can use the existing functional dependencies as is:
-                    .with_functional_dependencies(
-                        input.schema().functional_dependencies().clone(),
-                    )?,
-                );
-
-                Ok(LogicalPlan::Unnest(Unnest {
-                    input,
-                    column: column.clone(),
-                    schema,
-                    options: options.clone(),
-                }))
+                let input = inputs.swap_remove(0);
+                unnest_with_options(input, columns.clone(), options.clone())
             }
         }
     }
@@ -1581,8 +1541,8 @@ impl LogicalPlan {
                     LogicalPlan::DescribeTable(DescribeTable { .. }) => {
                         write!(f, "DescribeTable")
                     }
-                    LogicalPlan::Unnest(Unnest { column, .. }) => {
-                        write!(f, "Unnest: {column}")
+                    LogicalPlan::Unnest(Unnest { columns, .. }) => {
+                        write!(f, "Unnest: {}", expr_vec_fmt!(columns))
                     }
                 }
             }
@@ -2556,8 +2516,8 @@ pub enum Partitioning {
 pub struct Unnest {
     /// The incoming logical plan
     pub input: Arc<LogicalPlan>,
-    /// The column to unnest
-    pub column: Column,
+    /// The columns to unnest
+    pub columns: Vec<Column>,
     /// The output schema, containing the unnested field column.
     pub schema: DFSchemaRef,
     /// Options
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index 3644f89e8b..48f047c070 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -311,13 +311,13 @@ impl TreeNode for LogicalPlan {
             }
             LogicalPlan::Unnest(Unnest {
                 input,
-                column,
+                columns,
                 schema,
                 options,
             }) => rewrite_arc(input, f)?.update_data(|input| {
                 LogicalPlan::Unnest(Unnest {
                     input,
-                    column,
+                    columns,
                     schema,
                     options,
                 })
@@ -507,8 +507,12 @@ impl LogicalPlan {
             LogicalPlan::TableScan(TableScan { filters, .. }) => {
                 filters.iter().apply_until_stop(f)
             }
-            LogicalPlan::Unnest(Unnest { column, .. }) => {
-                f(&Expr::Column(column.clone()))
+            LogicalPlan::Unnest(Unnest { columns, .. }) => {
+                let exprs = columns
+                    .iter()
+                    .map(|c| Expr::Column(c.clone()))
+                    .collect::<Vec<_>>();
+                exprs.iter().apply_until_stop(f)
             }
             LogicalPlan::Distinct(Distinct::On(DistinctOn {
                 on_expr,
@@ -706,20 +710,6 @@ impl LogicalPlan {
                         fetch,
                     })
                 }),
-            LogicalPlan::Unnest(Unnest {
-                input,
-                column,
-                schema,
-                options,
-            }) => f(Expr::Column(column))?.map_data(|column| match column {
-                Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
-                    input,
-                    column,
-                    schema,
-                    options,
-                })),
-                _ => internal_err!("Transformation should return Column"),
-            })?,
             LogicalPlan::Distinct(Distinct::On(DistinctOn {
                 on_expr,
                 select_expr,
@@ -744,6 +734,7 @@ impl LogicalPlan {
             }),
             // plans without expressions
             LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Unnest(_)
             | LogicalPlan::RecursiveQuery(_)
             | LogicalPlan::Subquery(_)
             | LogicalPlan::SubqueryAlias(_)
diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs
index 85097f6249..35fec509c9 100644
--- a/datafusion/expr/src/tree_node.rs
+++ b/datafusion/expr/src/tree_node.rs
@@ -36,6 +36,7 @@ impl TreeNode for Expr {
     ) -> Result<TreeNodeRecursion> {
         let children = match self {
             Expr::Alias(Alias{expr,..})
+            | Expr::Unnest(Unnest{expr})
             | Expr::Not(expr)
             | Expr::IsNotNull(expr)
             | Expr::IsTrue(expr)
@@ -60,7 +61,6 @@ impl TreeNode for Expr {
                     GetFieldAccess::NamedStructField { .. } => vec![expr],
                 }
             }
-            Expr::Unnest(Unnest { exprs }) |
             Expr::GroupingSet(GroupingSet::Rollup(exprs))
             | Expr::GroupingSet(GroupingSet::Cube(exprs)) => 
exprs.iter().collect(),
             Expr::ScalarFunction (ScalarFunction{ args, .. } )  => {
diff --git a/datafusion/physical-plan/Cargo.toml 
b/datafusion/physical-plan/Cargo.toml
index 6a78bd596a..6863f26460 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -39,6 +39,7 @@ ahash = { version = "0.8", default-features = false, features 
= [
 arrow = { workspace = true }
 arrow-array = { workspace = true }
 arrow-buffer = { workspace = true }
+arrow-ord = { workspace = true }
 arrow-schema = { workspace = true }
 async-trait = { workspace = true }
 chrono = { workspace = true }
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index 6ea1b3c40c..45b848112b 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the unnest column plan for unnesting values in a column that 
contains a list
-//! type, conceptually is like joining each row with all the values in the 
list column.
+//! Define a plan for unnesting values in columns that contain a list type.
+
+use std::collections::HashMap;
 use std::{any::Any, sync::Arc};
 
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
@@ -27,15 +28,17 @@ use crate::{
 };
 
 use arrow::array::{
-    Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, GenericListArray,
-    LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray,
-};
-use arrow::compute::kernels;
-use arrow::datatypes::{
-    ArrowNativeType, DataType, Int32Type, Int64Type, Schema, SchemaRef,
+    Array, ArrayRef, AsArray, FixedSizeListArray, LargeListArray, ListArray,
+    PrimitiveArray,
 };
+use arrow::compute::kernels::length::length;
+use arrow::compute::kernels::zip::zip;
+use arrow::compute::{cast, is_not_null, kernels, sum};
+use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use datafusion_common::{exec_err, Result, UnnestOptions};
+use arrow_array::{Int64Array, Scalar};
+use arrow_ord::cmp::lt;
+use datafusion_common::{exec_datafusion_err, exec_err, Result, UnnestOptions};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::EquivalenceProperties;
 
@@ -43,7 +46,7 @@ use async_trait::async_trait;
 use futures::{Stream, StreamExt};
 use log::trace;
 
-/// Unnest the given column by joining the row with each value in the
+/// Unnest the given columns by joining the row with each value in the
 /// nested type.
 ///
 /// See [`UnnestOptions`] for more details and an example.
@@ -53,8 +56,8 @@ pub struct UnnestExec {
     input: Arc<dyn ExecutionPlan>,
     /// The schema once the unnest is applied
     schema: SchemaRef,
-    /// The unnest column
-    column: Column,
+    /// The unnest columns
+    columns: Vec<Column>,
     /// Options
     options: UnnestOptions,
     /// Execution metrics
@@ -67,7 +70,7 @@ impl UnnestExec {
     /// Create a new [UnnestExec].
     pub fn new(
         input: Arc<dyn ExecutionPlan>,
-        column: Column,
+        columns: Vec<Column>,
         schema: SchemaRef,
         options: UnnestOptions,
     ) -> Self {
@@ -75,7 +78,7 @@ impl UnnestExec {
         UnnestExec {
             input,
             schema,
-            column,
+            columns,
             options,
             metrics: Default::default(),
             cache,
@@ -134,7 +137,7 @@ impl ExecutionPlan for UnnestExec {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(Arc::new(UnnestExec::new(
             children[0].clone(),
-            self.column.clone(),
+            self.columns.clone(),
             self.schema.clone(),
             self.options.clone(),
         )))
@@ -155,7 +158,7 @@ impl ExecutionPlan for UnnestExec {
         Ok(Box::pin(UnnestStream {
             input,
             schema: self.schema.clone(),
-            column: self.column.clone(),
+            columns: self.columns.clone(),
             options: self.options.clone(),
             metrics,
         }))
@@ -210,8 +213,8 @@ struct UnnestStream {
     input: SendableRecordBatchStream,
     /// Unnested schema
     schema: Arc<Schema>,
-    /// The unnest column
-    column: Column,
+    /// The unnest columns
+    columns: Vec<Column>,
     /// Options
     options: UnnestOptions,
     /// Metrics
@@ -249,7 +252,7 @@ impl UnnestStream {
                 Some(Ok(batch)) => {
                     let timer = self.metrics.elapsed_compute.timer();
                     let result =
-                        build_batch(&batch, &self.schema, &self.column, 
&self.options);
+                        build_batch(&batch, &self.schema, &self.columns, 
&self.options);
                     self.metrics.input_batches.add(1);
                     self.metrics.input_rows.add(batch.num_rows());
                     if let Ok(ref batch) = result {
@@ -276,270 +279,265 @@ impl UnnestStream {
     }
 }
 
+/// For each row in a `RecordBatch`, some list columns need to be unnested.
+/// We will expand the values in each list into multiple rows,
+/// taking the longest length among these lists, and shorter lists are padded 
with NULLs.
+//
+/// For columns that don't need to be unnested, repeat their values until 
reaching the longest length.
 fn build_batch(
     batch: &RecordBatch,
     schema: &SchemaRef,
-    column: &Column,
+    columns: &[Column],
     options: &UnnestOptions,
 ) -> Result<RecordBatch> {
-    let list_array = column.evaluate(batch)?.into_array(batch.num_rows())?;
-    match list_array.data_type() {
-        DataType::List(_) => {
-            let list_array = 
list_array.as_any().downcast_ref::<ListArray>().unwrap();
-            build_batch_generic_list::<i32, Int32Type>(
-                batch,
-                schema,
-                column.index(),
-                list_array,
-                options,
-            )
-        }
-        DataType::LargeList(_) => {
-            let list_array = list_array
-                .as_any()
-                .downcast_ref::<LargeListArray>()
-                .unwrap();
-            build_batch_generic_list::<i64, Int64Type>(
-                batch,
-                schema,
-                column.index(),
-                list_array,
-                options,
-            )
-        }
-        DataType::FixedSizeList(_, _) => {
-            let list_array = list_array
-                .as_any()
-                .downcast_ref::<FixedSizeListArray>()
-                .unwrap();
-            build_batch_fixedsize_list(batch, schema, column.index(), 
list_array, options)
-        }
-        _ => exec_err!("Invalid unnest column {column}"),
+    let list_arrays: Vec<ArrayRef> = columns
+        .iter()
+        .map(|column| column.evaluate(batch)?.into_array(batch.num_rows()))
+        .collect::<Result<_>>()?;
+
+    let longest_length = find_longest_length(&list_arrays, options)?;
+    let unnested_length = longest_length.as_primitive::<Int64Type>();
+    let total_length = if unnested_length.is_empty() {
+        0
+    } else {
+        sum(unnested_length).ok_or_else(|| {
+            exec_datafusion_err!("Failed to calculate the total unnested 
length")
+        })? as usize
+    };
+    if total_length == 0 {
+        return Ok(RecordBatch::new_empty(schema.clone()));
     }
-}
 
-fn build_batch_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = 
T>>(
-    batch: &RecordBatch,
-    schema: &SchemaRef,
-    unnest_column_idx: usize,
-    list_array: &GenericListArray<T>,
-    options: &UnnestOptions,
-) -> Result<RecordBatch> {
-    let unnested_array = unnest_generic_list::<T, P>(list_array, options)?;
-
-    let take_indicies =
-        create_take_indicies_generic::<T, P>(list_array, unnested_array.len(), 
options);
-
-    batch_from_indices(
-        batch,
-        schema,
-        unnest_column_idx,
-        &unnested_array,
-        &take_indicies,
-    )
+    // Unnest all the list arrays
+    let unnested_arrays =
+        unnest_list_arrays(&list_arrays, unnested_length, total_length)?;
+    let unnested_array_map: HashMap<_, _> = unnested_arrays
+        .into_iter()
+        .zip(columns.iter())
+        .map(|(array, column)| (column.index(), array))
+        .collect();
+
+    // Create the take indices array for other columns
+    let take_indicies = create_take_indicies(unnested_length, total_length);
+
+    batch_from_indices(batch, schema, &unnested_array_map, &take_indicies)
 }
 
-/// Given this `GenericList` list_array:
+/// Find the longest list length among the given list arrays for each row.
+///
+/// For example if we have the following two list arrays:
 ///
 /// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// l1: [1, 2, 3], null, [], [3]
+/// l2: [4,5], [], null, [6, 7]
 /// ```
-/// Its values array is represented like this:
+///
+/// If `preserve_nulls` is false, the longest length array will be:
 ///
 /// ```ignore
-/// [1, 2, 3, 4, 5, 6]
+/// longest_length: [3, 0, 0, 2]
 /// ```
 ///
-/// So if there are no null values or `UnnestOptions.preserve_nulls` is false
-/// we can return the values array without any copying.
+/// whereas if `preserve_nulls` is true, the longest length array will be:
 ///
-/// Otherwise we'll transfrom the values array using the take kernel and the 
following take indicies:
 ///
 /// ```ignore
-/// 0, null, 1, 2, 3, null, 4, 5
+/// longest_length: [3, 1, 1, 2]
 /// ```
 ///
-fn unnest_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = T>>(
-    list_array: &GenericListArray<T>,
+fn find_longest_length(
+    list_arrays: &[ArrayRef],
     options: &UnnestOptions,
-) -> Result<Arc<dyn Array + 'static>> {
-    let values = list_array.values();
-    if list_array.null_count() == 0 {
-        return Ok(values.clone());
+) -> Result<ArrayRef> {
+    // The length of a NULL list
+    let null_length = if options.preserve_nulls {
+        Scalar::new(Int64Array::from_value(1, 1))
+    } else {
+        Scalar::new(Int64Array::from_value(0, 1))
+    };
+    let list_lengths: Vec<ArrayRef> = list_arrays
+        .iter()
+        .map(|list_array| {
+            let mut length_array = length(list_array)?;
+            // Make sure length arrays have the same type. Int64 is the most 
general one.
+            length_array = cast(&length_array, &DataType::Int64)?;
+            length_array =
+                zip(&is_not_null(&length_array)?, &length_array, 
&null_length)?;
+            Ok(length_array)
+        })
+        .collect::<Result<_>>()?;
+
+    let longest_length = list_lengths.iter().skip(1).try_fold(
+        list_lengths[0].clone(),
+        |longest, current| {
+            let is_lt = lt(&longest, &current)?;
+            zip(&is_lt, &current, &longest)
+        },
+    )?;
+    Ok(longest_length)
+}
+
+/// Trait defining common methods used for unnesting, implemented by list 
array types.
+trait ListArrayType: Array {
+    /// Returns a reference to the values of this list.
+    fn values(&self) -> &ArrayRef;
+
+    /// Returns the start and end offset of the values for the given row.
+    fn value_offsets(&self, row: usize) -> (i64, i64);
+}
+
+impl ListArrayType for ListArray {
+    fn values(&self) -> &ArrayRef {
+        self.values()
     }
 
-    let mut take_indicies_builder =
-        PrimitiveArray::<P>::builder(values.len() + list_array.null_count());
-    let offsets = list_array.value_offsets();
-    for row in 0..list_array.len() {
-        if list_array.is_null(row) {
-            if options.preserve_nulls {
-                take_indicies_builder.append_null();
-            }
-        } else {
-            let start = offsets[row].as_usize();
-            let end = offsets[row + 1].as_usize();
-            for idx in start..end {
-                
take_indicies_builder.append_value(P::Native::from_usize(idx).unwrap());
-            }
-        }
+    fn value_offsets(&self, row: usize) -> (i64, i64) {
+        let offsets = self.value_offsets();
+        (offsets[row].into(), offsets[row + 1].into())
     }
-    Ok(kernels::take::take(
-        &values,
-        &take_indicies_builder.finish(),
-        None,
-    )?)
 }
 
-fn build_batch_fixedsize_list(
-    batch: &RecordBatch,
-    schema: &SchemaRef,
-    unnest_column_idx: usize,
-    list_array: &FixedSizeListArray,
-    options: &UnnestOptions,
-) -> Result<RecordBatch> {
-    let unnested_array = unnest_fixed_list(list_array, options)?;
-
-    let take_indicies =
-        create_take_indicies_fixed(list_array, unnested_array.len(), options);
-
-    batch_from_indices(
-        batch,
-        schema,
-        unnest_column_idx,
-        &unnested_array,
-        &take_indicies,
-    )
+impl ListArrayType for LargeListArray {
+    fn values(&self) -> &ArrayRef {
+        self.values()
+    }
+
+    fn value_offsets(&self, row: usize) -> (i64, i64) {
+        let offsets = self.value_offsets();
+        (offsets[row], offsets[row + 1])
+    }
 }
 
-/// Given this `FixedSizeListArray` list_array:
+impl ListArrayType for FixedSizeListArray {
+    fn values(&self) -> &ArrayRef {
+        self.values()
+    }
+
+    fn value_offsets(&self, row: usize) -> (i64, i64) {
+        let start = self.value_offset(row) as i64;
+        (start, start + self.value_length() as i64)
+    }
+}
+
+/// Unnest multiple list arrays according to the length array.
+fn unnest_list_arrays(
+    list_arrays: &[ArrayRef],
+    length_array: &PrimitiveArray<Int64Type>,
+    capacity: usize,
+) -> Result<Vec<ArrayRef>> {
+    let typed_arrays = list_arrays
+        .iter()
+        .map(|list_array| match list_array.data_type() {
+            DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn 
ListArrayType),
+            DataType::LargeList(_) => {
+                Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
+            }
+            DataType::FixedSizeList(_, _) => {
+                Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
+            }
+            other => exec_err!("Invalid unnest datatype {other }"),
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    // If there is only one list column to unnest and it doesn't contain any 
NULL lists,
+    // we can return the values array directly without any copying.
+    if typed_arrays.len() == 1 && typed_arrays[0].null_count() == 0 {
+        Ok(vec![typed_arrays[0].values().clone()])
+    } else {
+        typed_arrays
+            .iter()
+            .map(|list_array| unnest_list_array(*list_array, length_array, 
capacity))
+            .collect::<Result<_>>()
+    }
+}
+
+/// Unnest a list array according the target length array.
 ///
-/// ```ignore
-/// [1, 2], null, [3, 4], null, [5, 6]
-/// ```
-/// Its values array is represented like this:
+/// Consider a list array like this:
 ///
 /// ```ignore
-/// [1, 2, null, null 3, 4, null, null, 5, 6]
+/// [1], [2, 3, 4], null, [5], [],
 /// ```
 ///
-/// So if there are no null values
-/// we can return the values array without any copying.
-///
-/// Otherwise we'll transfrom the values array using the take kernel.
-///
-/// If `UnnestOptions.preserve_nulls` is true the take indicies will look like 
this:
+/// and the length array is:
 ///
 /// ```ignore
-/// 0, 1, null, 4, 5, null, 8, 9
+/// [2, 3, 2, 1, 2]
 /// ```
-/// Otherwise we drop the nulls and take indicies will look like this:
+///
+/// If the length of a certain list is less than the target length, pad with 
NULLs.
+/// So the unnested array will look like this:
 ///
 /// ```ignore
-/// 0, 1, 4, 5, 8, 9
+/// [1, null, 2, 3, 4, null, null, 5, null, null]
 /// ```
 ///
-fn unnest_fixed_list(
-    list_array: &FixedSizeListArray,
-    options: &UnnestOptions,
-) -> Result<Arc<dyn Array + 'static>> {
+fn unnest_list_array(
+    list_array: &dyn ListArrayType,
+    length_array: &PrimitiveArray<Int64Type>,
+    capacity: usize,
+) -> Result<ArrayRef> {
     let values = list_array.values();
-
-    if list_array.null_count() == 0 {
-        Ok(values.clone())
-    } else {
-        let len_without_nulls =
-            values.len() - list_array.null_count() * list_array.value_length() 
as usize;
-        let null_count = if options.preserve_nulls {
-            list_array.null_count()
-        } else {
-            0
-        };
-        let mut builder =
-            PrimitiveArray::<Int32Type>::builder(len_without_nulls + 
null_count);
-        let mut take_offset = 0;
-        let fixed_value_length = list_array.value_length() as usize;
-        list_array.iter().for_each(|elem| match elem {
-            Some(_) => {
-                for i in 0..fixed_value_length {
-                    //take_offset + i is always positive
-                    let take_index = take_offset + i;
-                    builder.append_value(take_index as i32);
-                }
-                take_offset += fixed_value_length;
-            }
-            None => {
-                if options.preserve_nulls {
-                    builder.append_null();
-                }
-                take_offset += fixed_value_length;
+    let mut take_indicies_builder = 
PrimitiveArray::<Int64Type>::builder(capacity);
+    for row in 0..list_array.len() {
+        let mut value_length = 0;
+        if !list_array.is_null(row) {
+            let (start, end) = list_array.value_offsets(row);
+            value_length = end - start;
+            for i in start..end {
+                take_indicies_builder.append_value(i)
             }
-        });
-        Ok(kernels::take::take(&values, &builder.finish(), None)?)
+        }
+        let target_length = length_array.value(row);
+        debug_assert!(
+            value_length <= target_length,
+            "value length is beyond the longest length"
+        );
+        // Pad with NULL values
+        for _ in value_length..target_length {
+            take_indicies_builder.append_null();
+        }
     }
+    Ok(kernels::take::take(
+        &values,
+        &take_indicies_builder.finish(),
+        None,
+    )?)
 }
 
-/// Creates take indicies to be used to expand all other column's data.
-/// Every column value needs to be repeated as many times as many elements 
there is in each corresponding array value.
+/// Creates take indicies that will be used to expand all columns except for 
the unnest [`columns`](UnnestExec::columns).
+/// Every column value needs to be repeated multiple times according to the 
length array.
 ///
-/// If the column being unnested looks like this:
+/// If the length array looks like this:
 ///
 /// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// [2, 3, 1]
 /// ```
-/// Then `create_take_indicies_generic` will return an array like this
+/// Then `create_take_indicies` will return an array like this
 ///
 /// ```ignore
-/// [1, null, 2, 2, 2, null, 4, 4]
+/// [0, 0, 1, 1, 1, 2]
 /// ```
 ///
-fn create_take_indicies_generic<T: OffsetSizeTrait, P: 
ArrowPrimitiveType<Native = T>>(
-    list_array: &GenericListArray<T>,
+fn create_take_indicies(
+    length_array: &PrimitiveArray<Int64Type>,
     capacity: usize,
-    options: &UnnestOptions,
-) -> PrimitiveArray<P> {
-    let mut builder = PrimitiveArray::<P>::builder(capacity);
-    let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
-
-    for row in 0..list_array.len() {
-        let repeat = if list_array.is_null(row) {
-            null_repeat
-        } else {
-            list_array.value(row).len()
-        };
-
-        // `index` is a positive interger.
-        let index = P::Native::from_usize(row).unwrap();
-        (0..repeat).for_each(|_| builder.append_value(index));
+) -> PrimitiveArray<Int64Type> {
+    // `find_longest_length()` guarantees this.
+    debug_assert!(
+        length_array.null_count() == 0,
+        "length array should not contain nulls"
+    );
+    let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
+    for (index, repeat) in length_array.iter().enumerate() {
+        // The length array should not contain nulls, so unwrap is safe
+        let repeat = repeat.unwrap();
+        (0..repeat).for_each(|_| builder.append_value(index as i64));
     }
-
     builder.finish()
 }
 
-fn create_take_indicies_fixed(
-    list_array: &FixedSizeListArray,
-    capacity: usize,
-    options: &UnnestOptions,
-) -> PrimitiveArray<Int32Type> {
-    let mut builder = PrimitiveArray::<Int32Type>::builder(capacity);
-    let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
-
-    for row in 0..list_array.len() {
-        let repeat = if list_array.is_null(row) {
-            null_repeat
-        } else {
-            list_array.value_length() as usize
-        };
-
-        // `index` is a positive interger.
-        let index = <Int32Type as 
ArrowPrimitiveType>::Native::from_usize(row).unwrap();
-        (0..repeat).for_each(|_| builder.append_value(index));
-    }
-
-    builder.finish()
-}
-
-/// Create the final batch given the unnested column array and a `indices` 
array
+/// Create the final batch given the unnested column arrays and a `indices` 
array
 /// that is used by the take kernel to copy values.
 ///
 /// For example if we have the following `RecordBatch`:
@@ -549,8 +547,8 @@ fn create_take_indicies_fixed(
 /// c2: 'a', 'b',  'c', null, 'd'
 /// ```
 ///
-/// then the `unnested_array` contains the unnest column that will replace 
`c1` in
-/// the final batch:
+/// then the `unnested_list_arrays` contains the unnest column that will 
replace `c1` in
+/// the final batch if `preserve_nulls` is true:
 ///
 /// ```ignore
 /// c1: 1, null, 2, 3, 4, null, 5, 6
@@ -570,26 +568,19 @@ fn create_take_indicies_fixed(
 /// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
 /// ```
 ///
-fn batch_from_indices<T>(
+fn batch_from_indices(
     batch: &RecordBatch,
     schema: &SchemaRef,
-    unnest_column_idx: usize,
-    unnested_array: &ArrayRef,
-    indices: &PrimitiveArray<T>,
-) -> Result<RecordBatch>
-where
-    T: ArrowPrimitiveType,
-{
+    unnested_list_arrays: &HashMap<usize, ArrayRef>,
+    indices: &PrimitiveArray<Int64Type>,
+) -> Result<RecordBatch> {
     let arrays = batch
         .columns()
         .iter()
         .enumerate()
-        .map(|(col_idx, arr)| {
-            if col_idx == unnest_column_idx {
-                Ok(unnested_array.clone())
-            } else {
-                Ok(kernels::take::take(&arr, indices, None)?)
-            }
+        .map(|(col_idx, arr)| match unnested_list_arrays.get(&col_idx) {
+            Some(unnested_array) => Ok(unnested_array.clone()),
+            None => Ok(kernels::take::take(arr, indices, None)?),
         })
         .collect::<Result<Vec<_>>>()?;
 
@@ -599,51 +590,51 @@ where
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow::{
-        array::AsArray,
-        datatypes::{DataType, Field},
-    };
-    use arrow_array::StringArray;
+    use arrow::datatypes::{DataType, Field};
+    use arrow_array::{GenericListArray, OffsetSizeTrait, StringArray};
     use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer};
 
-    // Create a ListArray with the following list values:
+    // Create a GenericListArray with the following list values:
     //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
-    fn make_test_array() -> ListArray {
+    fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
+    where
+        OffsetSize: OffsetSizeTrait,
+    {
         let mut values = vec![];
-        let mut offsets = vec![0];
-        let mut valid = BooleanBufferBuilder::new(2);
+        let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
+        let mut valid = BooleanBufferBuilder::new(6);
 
         // [A, B, C]
         values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
-        offsets.push(values.len() as i32);
+        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
         valid.append(true);
 
         // []
-        offsets.push(values.len() as i32);
+        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
         valid.append(true);
 
         // NULL with non-zero value length
         // Issue https://github.com/apache/arrow-datafusion/issues/9932
         values.push(Some("?"));
-        offsets.push(values.len() as i32);
+        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
         valid.append(false);
 
         // [D]
         values.push(Some("D"));
-        offsets.push(values.len() as i32);
+        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
         valid.append(true);
 
         // Another NULL with zero value length
-        offsets.push(values.len() as i32);
+        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
         valid.append(false);
 
         // [NULL, F]
         values.extend_from_slice(&[None, Some("F")]);
-        offsets.push(values.len() as i32);
+        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
         valid.append(true);
 
         let field = Arc::new(Field::new("item", DataType::Utf8, true));
-        ListArray::new(
+        GenericListArray::<OffsetSize>::new(
             field,
             OffsetBuffer::new(offsets.into()),
             Arc::new(StringArray::from(values)),
@@ -651,43 +642,141 @@ mod tests {
         )
     }
 
-    #[test]
-    fn test_unnest_generic_list() -> datafusion_common::Result<()> {
-        let list_array = make_test_array();
-
-        // Test with preserve_nulls = false
-        let options = UnnestOptions {
-            preserve_nulls: false,
-        };
-        let unnested_array =
-            unnest_generic_list::<i32, Int32Type>(&list_array, &options)?;
-        let strs = 
unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
-        assert_eq!(
-            strs,
-            vec![Some("A"), Some("B"), Some("C"), Some("D"), None, Some("F")]
-        );
+    // Create a FixedSizeListArray with the following list values:
+    //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+    fn make_fixed_list() -> FixedSizeListArray {
+        let values = Arc::new(StringArray::from_iter([
+            Some("A"),
+            Some("B"),
+            None,
+            None,
+            Some("C"),
+            Some("D"),
+            None,
+            None,
+            None,
+            Some("F"),
+            None,
+            None,
+        ]));
+        let field = Arc::new(Field::new("item", DataType::Utf8, true));
+        let valid = NullBuffer::from(vec![true, false, true, false, true, 
true]);
+        FixedSizeListArray::new(field, 2, values, Some(valid))
+    }
 
-        // Test with preserve_nulls = true
-        let options = UnnestOptions {
-            preserve_nulls: true,
-        };
-        let unnested_array =
-            unnest_generic_list::<i32, Int32Type>(&list_array, &options)?;
+    fn verify_unnest_list_array(
+        list_array: &dyn ListArrayType,
+        lengths: Vec<i64>,
+        expected: Vec<Option<&str>>,
+    ) -> datafusion_common::Result<()> {
+        let length_array = Int64Array::from(lengths);
+        let unnested_array = unnest_list_array(list_array, &length_array, 3 * 
6)?;
         let strs = 
unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
-        assert_eq!(
-            strs,
+        assert_eq!(strs, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn test_unnest_list_array() -> datafusion_common::Result<()> {
+        // [A, B, C], [], NULL, [D], NULL, [NULL, F]
+        let list_array = make_generic_array::<i32>();
+        verify_unnest_list_array(
+            &list_array,
+            vec![3, 2, 1, 2, 0, 3],
             vec![
                 Some("A"),
                 Some("B"),
                 Some("C"),
                 None,
+                None,
+                None,
                 Some("D"),
                 None,
                 None,
-                Some("F")
-            ]
+                Some("F"),
+                None,
+            ],
+        )?;
+
+        // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+        let list_array = make_fixed_list();
+        verify_unnest_list_array(
+            &list_array,
+            vec![3, 1, 2, 0, 2, 3],
+            vec![
+                Some("A"),
+                Some("B"),
+                None,
+                None,
+                Some("C"),
+                Some("D"),
+                None,
+                Some("F"),
+                None,
+                None,
+                None,
+            ],
+        )?;
+
+        Ok(())
+    }
+
+    fn verify_longest_length(
+        list_arrays: &[ArrayRef],
+        preserve_nulls: bool,
+        expected: Vec<i64>,
+    ) -> datafusion_common::Result<()> {
+        let options = UnnestOptions { preserve_nulls };
+        let longest_length = find_longest_length(list_arrays, &options)?;
+        let expected_array = Int64Array::from(expected);
+        assert_eq!(
+            longest_length
+                .as_any()
+                .downcast_ref::<Int64Array>()
+                .unwrap(),
+            &expected_array
         );
+        Ok(())
+    }
+
+    #[test]
+    fn test_longest_list_length() -> datafusion_common::Result<()> {
+        // Test with single ListArray
+        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
+        let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
+        verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1, 
0, 2])?;
+        verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1, 
2])?;
+
+        // Test with single LargeListArray
+        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
+        let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
+        verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1, 
0, 2])?;
+        verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1, 
2])?;
+
+        // Test with single FixedSizeListArray
+        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+        let list_array = Arc::new(make_fixed_list()) as ArrayRef;
+        verify_longest_length(&[list_array.clone()], false, vec![2, 0, 2, 0, 
2, 2])?;
+        verify_longest_length(&[list_array.clone()], true, vec![2, 1, 2, 1, 2, 
2])?;
+
+        // Test with multiple list arrays
+        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
+        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
+        let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
+        let list2 = Arc::new(make_fixed_list()) as ArrayRef;
+        let list_arrays = vec![list1.clone(), list2.clone()];
+        verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
+        verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
 
         Ok(())
     }
+
+    #[test]
+    fn test_create_take_indicies() -> datafusion_common::Result<()> {
+        let length_array = Int64Array::from(vec![2, 3, 1]);
+        let take_indicies = create_take_indicies(&length_array, 6);
+        let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
+        assert_eq!(take_indicies, expected);
+        Ok(())
+    }
 }
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 4e61385bb5..e66bd1a5f0 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -1257,8 +1257,11 @@ pub fn parse_expr(
             parse_required_expr(negative.expr.as_deref(), registry, "expr", 
codec)?,
         ))),
         ExprType::Unnest(unnest) => {
-            let exprs = parse_exprs(&unnest.exprs, registry, codec)?;
-            Ok(Expr::Unnest(Unnest { exprs }))
+            let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
+            if exprs.len() != 1 {
+                return Err(proto_error("Unnest must have exactly one 
expression"));
+            }
+            Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
         }
         ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
             Box::new(parse_required_expr(
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 2680bc15e1..4916b4bed9 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -963,9 +963,9 @@ pub fn serialize_expr(
                 expr_type: Some(ExprType::Negative(expr)),
             }
         }
-        Expr::Unnest(Unnest { exprs }) => {
+        Expr::Unnest(Unnest { expr }) => {
             let expr = protobuf::Unnest {
-                exprs: serialize_exprs(exprs, codec)?,
+                exprs: vec![serialize_expr(expr.as_ref(), codec)?],
             };
             protobuf::LogicalExprNode {
                 expr_type: Some(ExprType::Unnest(expr)),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index e680a1b2ff..eee15008fb 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1599,7 +1599,7 @@ fn roundtrip_inlist() {
 #[test]
 fn roundtrip_unnest() {
     let test_expr = Expr::Unnest(Unnest {
-        exprs: vec![lit(1), lit(2), lit(3)],
+        expr: Box::new(col("col")),
     });
 
     let ctx = SessionContext::new();
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index 501c51c4be..c225afec58 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -119,10 +119,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         // Build Unnest expression
         if name.eq("unnest") {
-            let exprs =
+            let mut exprs =
                 self.function_args_to_expr(args.clone(), schema, 
planner_context)?;
-            Self::check_unnest_args(&exprs, schema)?;
-            return Ok(Expr::Unnest(Unnest { exprs }));
+            if exprs.len() != 1 {
+                return plan_err!("unnest() requires exactly one argument");
+            }
+            let expr = exprs.swap_remove(0);
+            Self::check_unnest_arg(&expr, schema)?;
+            return Ok(Expr::Unnest(Unnest::new(expr)));
         }
 
         // next, scalar built-in
@@ -342,17 +346,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .collect::<Result<Vec<Expr>>>()
     }
 
-    pub(crate) fn check_unnest_args(args: &[Expr], schema: &DFSchema) -> 
Result<()> {
-        // Currently only one argument is supported
-        let arg = match args.len() {
-            0 => {
-                return plan_err!("unnest() requires at least one argument");
-            }
-            1 => &args[0],
-            _ => {
-                return not_impl_err!("unnest() does not support multiple 
arguments yet");
-            }
-        };
+    pub(crate) fn check_unnest_arg(arg: &Expr, schema: &DFSchema) -> 
Result<()> {
         // Check argument type, array types are supported
         match arg.get_type(schema)? {
             DataType::List(_)
diff --git a/datafusion/sql/src/relation/mod.rs 
b/datafusion/sql/src/relation/mod.rs
index 1e01205ba6..9380e569f2 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -105,15 +105,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 // Unnest table factor has empty input
                 let schema = DFSchema::empty();
                 let input = LogicalPlanBuilder::empty(true).build()?;
-                let exprs = array_exprs
+                // Unnest table factor can have multiple arugments.
+                // We treat each argument as a separate unnest expression.
+                let unnest_exprs = array_exprs
                     .into_iter()
-                    .map(|expr| {
-                        self.sql_expr_to_logical_expr(expr, &schema, 
planner_context)
+                    .map(|sql_expr| {
+                        let expr = self.sql_expr_to_logical_expr(
+                            sql_expr,
+                            &schema,
+                            planner_context,
+                        )?;
+                        Self::check_unnest_arg(&expr, &schema)?;
+                        Ok(Expr::Unnest(Unnest::new(expr)))
                     })
                     .collect::<Result<Vec<_>>>()?;
-                Self::check_unnest_args(&exprs, &schema)?;
-                let unnest_expr = Expr::Unnest(Unnest { exprs });
-                let logical_plan = self.try_process_unnest(input, 
vec![unnest_expr])?;
+                if unnest_exprs.is_empty() {
+                    return plan_err!("UNNEST must have at least one argument");
+                }
+                let logical_plan = self.try_process_unnest(input, 
unnest_exprs)?;
                 (logical_plan, alias)
             }
             TableFactor::UNNEST { .. } => {
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 1bfd60a8ce..30eacdb44c 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -294,13 +294,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     transformed,
                     tnr: _,
                 } = expr.transform_up_mut(&mut |expr: Expr| {
-                    if let Expr::Unnest(Unnest { ref exprs }) = expr {
+                    if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
                         let column_name = expr.display_name()?;
                         unnest_columns.push(column_name.clone());
                         // Add alias for the argument expression, to avoid 
naming conflicts with other expressions
                         // in the select list. For example: `select 
unnest(col1), col1 from t`.
                         inner_projection_exprs
-                            .push(exprs[0].clone().alias(column_name.clone()));
+                            .push(arg.clone().alias(column_name.clone()));
                         Ok(Transformed::yes(Expr::Column(Column::from_name(
                             column_name,
                         ))))
@@ -332,15 +332,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .project(inner_projection_exprs)?
                 .build()
         } else {
-            if unnest_columns.len() > 1 {
-                return not_impl_err!("Only support single unnest expression 
for now");
-            }
-            let unnest_column = unnest_columns.pop().unwrap();
+            let columns = unnest_columns.into_iter().map(|col| 
col.into()).collect();
             // Set preserve_nulls to false to ensure compatibility with DuckDB 
and PostgreSQL
             let unnest_options = 
UnnestOptions::new().with_preserve_nulls(false);
             LogicalPlanBuilder::from(input)
                 .project(inner_projection_exprs)?
-                .unnest_column_with_options(unnest_column, unnest_options)?
+                .unnest_columns_with_options(columns, unnest_options)?
                 .project(outer_projection_exprs)?
                 .build()
         }
diff --git a/datafusion/sqllogictest/test_files/unnest.slt 
b/datafusion/sqllogictest/test_files/unnest.slt
index 5c178bb392..38207fa7d1 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -22,12 +22,12 @@
 statement ok
 CREATE TABLE unnest_table
 AS VALUES
-    ([1,2,3], [7], 1),
-    ([4,5], [8,9,10], 2),
-    ([6], [11,12], 3),
-    ([12], [null, 42, null], null),
+    ([1,2,3], [7], 1, [13, 14]),
+    ([4,5], [8,9,10], 2, [15, 16]),
+    ([6], [11,12], 3, null),
+    ([12], [null, 42, null], null, null),
     -- null array to verify the `preserve_nulls` option
-    (null, null, 4)
+    (null, null, 4, [17, 18])
 ;
 
 ## Basic unnest expression in select list
@@ -93,6 +93,20 @@ NULL
 42
 NULL
 
+## Unnest single column and filter out null lists
+query I
+select unnest(column2) from unnest_table where column2 is not null;
+----
+7
+8
+9
+10
+11
+12
+NULL
+42
+NULL
+
 ## Unnest with additional column
 ## Issue: https://github.com/apache/arrow-datafusion/issues/9349
 query II
@@ -135,9 +149,48 @@ select array_remove(column1, 4), unnest(column2), column3 
* 10 from unnest_table
 query error DataFusion error: Error during planning: unnest\(\) can only be 
applied to array, struct and null
 select unnest(column3) from unnest_table;
 
+## Unnest doesn't work with untyped nulls
+query error DataFusion error: This feature is not implemented: unnest\(\) does 
not support null yet
+select unnest(null) from unnest_table;
+
 ## Multiple unnest functions in selection
-query error DataFusion error: This feature is not implemented: Only support 
single unnest expression for now
-select unnest(column1), unnest(column2) from unnest_table;
+query ?I
+select unnest([]), unnest(NULL::int[]);
+----
+
+query III
+select
+    unnest(column1),
+    unnest(arrow_cast(column2, 'LargeList(Int64)')),
+    unnest(arrow_cast(column4, 'FixedSizeList(2, Int64)'))
+from unnest_table where column4 is not null;
+----
+1 7 13
+2 NULL 14
+3 NULL NULL
+4 8 15
+5 9 16
+NULL 10 NULL
+NULL NULL 17
+NULL NULL 18
+
+query IIII
+select 
+    unnest(column1), unnest(column2) + 2, 
+    column3 * 10, unnest(array_remove(column1, '4')) 
+from unnest_table;
+----
+1 9 10 1
+2 NULL 10 2
+3 NULL 10 3
+4 10 20 5
+5 11 20 NULL
+NULL 12 20 NULL
+6 13 30 6
+NULL 14 30 NULL
+12 NULL NULL 12
+NULL 44 NULL NULL
+NULL NULL NULL NULL
 
 ## Unnest scalar in select list
 query error DataFusion error: Error during planning: unnest\(\) can only be 
applied to array, struct and null
@@ -149,7 +202,7 @@ select * from unnest(1);
 
 
 ## Unnest empty expression in select list
-query error DataFusion error: Error during planning: unnest\(\) requires at 
least one argument
+query error DataFusion error: Error during planning: unnest\(\) requires 
exactly one argument
 select unnest();
 
 ## Unnest empty expression in from clause
@@ -157,13 +210,26 @@ query error DataFusion error: SQL error: 
ParserError\("Expected an expression:,
 select * from unnest();
 
 
-## Unnest multiple expressions in select list
-query error DataFusion error: This feature is not implemented: unnest\(\) does 
not support multiple arguments yet
+## Unnest multiple expressions in select list. This form is only allowed in a 
query's FROM clause.
+query error DataFusion error: Error during planning: unnest\(\) requires 
exactly one argument
 select unnest([1,2], [2,3]);
 
 ## Unnest multiple expressions in from clause
-query error DataFusion error: This feature is not implemented: unnest\(\) does 
not support multiple arguments yet
-select * from unnest([1,2], [2,3]);
+query ITII
+select * from unnest(
+    [1,2],
+    arrow_cast(['a','b', 'c'], 'LargeList(Utf8)'),
+    arrow_cast([4, NULL], 'FixedSizeList(2, Int64)'),
+    NULL::int[]
+) as t(a, b, c, d);
+----
+1 a 4 NULL
+2 b NULL NULL
+NULL c NULL NULL
+
+query ?I
+select * from unnest([], NULL::int[]);
+----
 
 
 ## Unnest struct expression in select list

Reply via email to