This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bbebe21c9 Add API to make `unnest` consistent with DuckDB/ClickHouse, 
add option for preserve_nulls, update docs (#7168)
8bbebe21c9 is described below

commit 8bbebe21c93b1a5d54e8a21de68f0e17a3aeb0f8
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 10 08:20:19 2023 -0500

    Add API to make `unnest` consistent with DuckDB/ClickHouse, add option for 
preserve_nulls, update docs (#7168)
    
    * Make unnest consistent with DuckDB/ClickHouse, add option for 
preserve_nulls, update docs
    
    * fix typo
    
    * commit file
    
    * Update datafusion/core/src/dataframe.rs
    
    * Fix logic
    
    * fix doc test
---
 datafusion/common/src/lib.rs                |   2 +
 datafusion/common/src/unnest.rs             |  90 ++++++++++++++
 datafusion/core/src/dataframe.rs            |  22 +++-
 datafusion/core/src/physical_plan/unnest.rs |  23 +++-
 datafusion/core/src/physical_planner.rs     |   4 +-
 datafusion/core/tests/dataframe/mod.rs      | 183 +++++++++++++++++++++++++++-
 datafusion/expr/src/logical_plan/builder.rs |  26 +++-
 datafusion/expr/src/logical_plan/plan.rs    |   6 +-
 datafusion/expr/src/utils.rs                |   8 +-
 9 files changed, 354 insertions(+), 10 deletions(-)

diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index b831d3d0ca..50875f6a59 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -35,6 +35,7 @@ pub mod stats;
 mod table_reference;
 pub mod test_util;
 pub mod tree_node;
+mod unnest;
 pub mod utils;
 
 pub use column::Column;
@@ -52,6 +53,7 @@ pub use scalar::{ScalarType, ScalarValue};
 pub use schema_reference::{OwnedSchemaReference, SchemaReference};
 pub use stats::{ColumnStatistics, Statistics};
 pub use table_reference::{OwnedTableReference, ResolvedTableReference, 
TableReference};
+pub use unnest::UnnestOptions;
 pub use utils::project_schema;
 
 /// Downcast an Arrow Array to a concrete type, return an 
`DataFusionError::Internal` if the cast is
diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs
new file mode 100644
index 0000000000..fd92267f9b
--- /dev/null
+++ b/datafusion/common/src/unnest.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`UnnestOptions`] for unnesting structured types
+
+/// Options for unnesting a column that contains a list type,
+/// replicating values in the other, non nested rows.
+///
+/// Conceptually this operation is like joining each row with all the
+/// values in the list column.
+///
+/// If `preserve_nulls` is false, nulls and empty lists
+/// from the input column are not carried through to the output. This
+/// is the default behavior for other systems such as ClickHouse and
+/// DuckDB
+///
+/// If `preserve_nulls` is true (the default), nulls from the input
+/// column are carried through to the output.
+///
+/// # Examples
+///
+/// ## `Unnest(c1)`, preserve_nulls: false
+/// ```text
+///      ┌─────────┐ ┌─────┐                ┌─────────┐ ┌─────┐
+///      │ {1, 2}  │ │  A  │   Unnest       │    1    │ │  A  │
+///      ├─────────┤ ├─────┤                ├─────────┤ ├─────┤
+///      │  null   │ │  B  │                │    2    │ │  A  │
+///      ├─────────┤ ├─────┤ ────────────▶  ├─────────┤ ├─────┤
+///      │   {}    │ │  D  │                │    3    │ │  E  │
+///      ├─────────┤ ├─────┤                └─────────┘ └─────┘
+///      │   {3}   │ │  E  │                    c1        c2
+///      └─────────┘ └─────┘
+///        c1         c2
+/// ```
+///
+/// ## `Unnest(c1)`, preserve_nulls: true
+/// ```text
+///      ┌─────────┐ ┌─────┐                ┌─────────┐ ┌─────┐
+///      │ {1, 2}  │ │  A  │   Unnest       │    1    │ │  A  │
+///      ├─────────┤ ├─────┤                ├─────────┤ ├─────┤
+///      │  null   │ │  B  │                │    2    │ │  A  │
+///      ├─────────┤ ├─────┤ ────────────▶  ├─────────┤ ├─────┤
+///      │   {}    │ │  D  │                │  null   │ │  B  │
+///      ├─────────┤ ├─────┤                ├─────────┤ ├─────┤
+///      │   {3}   │ │  E  │                │    3    │ │  E  │
+///      └─────────┘ └─────┘                └─────────┘ └─────┘
+///        c1         c2                        c1        c2
+/// ```
+#[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)]
+pub struct UnnestOptions {
+    /// Should nulls in the input be preserved? Defaults to true
+    pub preserve_nulls: bool,
+}
+
+impl Default for UnnestOptions {
+    fn default() -> Self {
+        Self {
+            // default to true to maintain backwards compatible behavior
+            preserve_nulls: true,
+        }
+    }
+}
+
+impl UnnestOptions {
+    /// Create a new [`UnnestOptions`] with default values
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Set the behavior with nulls in the input as described on
+    /// [`Self`]
+    pub fn with_preserve_nulls(mut self, preserve_nulls: bool) -> Self {
+        self.preserve_nulls = preserve_nulls;
+        self
+    }
+}
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 8d4ad6cc50..5b1983f567 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -24,7 +24,7 @@ use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
 use arrow::compute::{cast, concat};
 use arrow::datatypes::{DataType, Field};
 use async_trait::async_trait;
-use datafusion_common::{DataFusionError, SchemaError};
+use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
 use parquet::file::properties::WriterProperties;
 
 use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -178,6 +178,11 @@ impl DataFrame {
 
     /// Expand each list element of a column to multiple rows.
     ///
+    /// Seee also:
+    ///
+    /// 1. [`UnnestOptions`] documentation for the behavior of `unnest`
+    /// 2. [`Self::unnest_column_with_options`]
+    ///
     /// ```
     /// # use datafusion::prelude::*;
     /// # use datafusion::error::Result;
@@ -190,8 +195,21 @@ impl DataFrame {
     /// # }
     /// ```
     pub fn unnest_column(self, column: &str) -> Result<DataFrame> {
+        self.unnest_column_with_options(column, UnnestOptions::new())
+    }
+
+    /// Expand each list element of a column to multiple rows, with
+    /// behavior controlled by [`UnnestOptions`].
+    ///
+    /// Please see the documentation on [`UnnestOptions`] for more
+    /// details about the meaning of unnest.
+    pub fn unnest_column_with_options(
+        self,
+        column: &str,
+        options: UnnestOptions,
+    ) -> Result<DataFrame> {
         let plan = LogicalPlanBuilder::from(self.plan)
-            .unnest_column(column)?
+            .unnest_column_with_options(column, options)?
             .build()?;
         Ok(DataFrame::new(self.session_state, plan))
     }
diff --git a/datafusion/core/src/physical_plan/unnest.rs 
b/datafusion/core/src/physical_plan/unnest.rs
index 7a213dffeb..b022cf751f 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -27,6 +27,7 @@ use arrow::datatypes::{
 };
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use datafusion_common::UnnestOptions;
 use datafusion_common::{cast::as_primitive_array, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 use futures::Stream;
@@ -43,7 +44,10 @@ use crate::physical_plan::{
 
 use super::DisplayAs;
 
-/// Unnest the given column by joining the row with each value in the nested 
type.
+/// Unnest the given column by joining the row with each value in the
+/// nested type.
+///
+/// See [`UnnestOptions`] for more details and an example.
 #[derive(Debug)]
 pub struct UnnestExec {
     /// Input execution plan
@@ -52,15 +56,23 @@ pub struct UnnestExec {
     schema: SchemaRef,
     /// The unnest column
     column: Column,
+    /// Options
+    options: UnnestOptions,
 }
 
 impl UnnestExec {
     /// Create a new [UnnestExec].
-    pub fn new(input: Arc<dyn ExecutionPlan>, column: Column, schema: 
SchemaRef) -> Self {
+    pub fn new(
+        input: Arc<dyn ExecutionPlan>,
+        column: Column,
+        schema: SchemaRef,
+        options: UnnestOptions,
+    ) -> Self {
         UnnestExec {
             input,
             schema,
             column,
+            options,
         }
     }
 }
@@ -107,6 +119,7 @@ impl ExecutionPlan for UnnestExec {
             children[0].clone(),
             self.column.clone(),
             self.schema.clone(),
+            self.options.clone(),
         )))
     }
 
@@ -133,6 +146,12 @@ impl ExecutionPlan for UnnestExec {
     ) -> Result<SendableRecordBatchStream> {
         let input = self.input.execute(partition, context)?;
 
+        if !self.options.preserve_nulls {
+            return Err(DataFusionError::NotImplemented(
+                "Unnest with preserve_nulls=false".to_string(),
+            ));
+        }
+
         Ok(Box::pin(UnnestStream {
             input,
             schema: self.schema.clone(),
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 172304165a..6b868b9b24 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1164,12 +1164,12 @@ impl DefaultPhysicalPlanner {
 
                     Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
                 }
-                LogicalPlan::Unnest(Unnest { input, column, schema }) => {
+                LogicalPlan::Unnest(Unnest { input, column, schema, options }) 
=> {
                     let input = self.create_initial_plan(input, 
session_state).await?;
                     let column_exec = schema.index_of_column(column)
                         .map(|idx| Column::new(&column.name, idx))?;
                     let schema = 
SchemaRef::new(schema.as_ref().to_owned().into());
-                    Ok(Arc::new(UnnestExec::new(input, column_exec, schema)))
+                    Ok(Arc::new(UnnestExec::new(input, column_exec, schema, 
options.clone())))
                 }
                 LogicalPlan::Ddl(ddl) => {
                     // There is no default plan for DDl statements --
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index e382176525..bfdb2bda1b 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -37,7 +37,7 @@ use datafusion::prelude::JoinType;
 use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
 use datafusion::test_util::parquet_test_data;
 use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
 use datafusion_execution::config::SessionConfig;
 use datafusion_expr::expr::{GroupingSet, Sort};
 use datafusion_expr::Expr::Wildcard;
@@ -1044,6 +1044,82 @@ async fn unnest_columns() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn unnest_column_preserve_nulls_not_supported() -> Result<()> {
+    // Unnest, preserving nulls not yet supported
+    let options = UnnestOptions::new().with_preserve_nulls(false);
+
+    let results = table_with_lists_and_nulls()
+        .await?
+        .clone()
+        .unnest_column_with_options("list", options)?
+        .collect()
+        .await;
+
+    assert_eq!(
+        results.unwrap_err().to_string(),
+        "This feature is not implemented: Unnest with preserve_nulls=false"
+    );
+    Ok(())
+}
+#[tokio::test]
+#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
+async fn unnest_column_nulls() -> Result<()> {
+    let df = table_with_lists_and_nulls().await?;
+    let results = df.clone().collect().await?;
+    let expected = vec![
+        "+--------+----+",
+        "| list   | id |",
+        "+--------+----+",
+        "| [1, 2] | A  |",
+        "|        | B  |",
+        "| []     | C  |",
+        "| [3]    | D  |",
+        "+--------+----+",
+    ];
+    assert_batches_eq!(expected, &results);
+
+    // Unnest, preserving nulls (row with B is preserved)
+    let options = UnnestOptions::new().with_preserve_nulls(true);
+
+    let results = df
+        .clone()
+        .unnest_column_with_options("list", options)?
+        .collect()
+        .await?;
+    let expected = vec![
+        "+------+----+",
+        "| list | id |",
+        "+------+----+",
+        "| 1    | A  |",
+        "| 2    | A  |",
+        "|      | B  |",
+        "| 3    | D  |",
+        "+------+----+",
+    ];
+    assert_batches_eq!(expected, &results);
+
+    // NOTE this is incorrect,
+    let options = UnnestOptions::new().with_preserve_nulls(false);
+    let results = df
+        .unnest_column_with_options("list", options)?
+        .collect()
+        .await?;
+    let expected = vec![
+        "+------+----+",
+        "| list | id |",
+        "+------+----+",
+        "| 1    | A  |",
+        "| 2    | A  |",
+        "|      | B  |", // this row should not be here
+        "| 3    | D  |",
+        "+------+----+",
+    ];
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn unnest_fixed_list() -> Result<()> {
     let mut shape_id_builder = UInt32Builder::new();
@@ -1114,6 +1190,77 @@ async fn unnest_fixed_list() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
+async fn unnest_fixed_list_nonull() -> Result<()> {
+    let mut shape_id_builder = UInt32Builder::new();
+    let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+
+    for idx in 0..6 {
+        // Append shape id.
+        shape_id_builder.append_value(idx as u32 + 1);
+
+        tags_builder
+            .values()
+            .append_value(format!("tag{}1", idx + 1));
+        tags_builder
+            .values()
+            .append_value(format!("tag{}2", idx + 1));
+        tags_builder.append(true);
+    }
+
+    let batch = RecordBatch::try_from_iter(vec![
+        ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
+        ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
+    ])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_batch("shapes", batch)?;
+    let df = ctx.table("shapes").await?;
+
+    let results = df.clone().collect().await?;
+    let expected = vec![
+        "+----------+----------------+",
+        "| shape_id | tags           |",
+        "+----------+----------------+",
+        "| 1        | [tag11, tag12] |",
+        "| 2        | [tag21, tag22] |",
+        "| 3        | [tag31, tag32] |",
+        "| 4        | [tag41, tag42] |",
+        "| 5        | [tag51, tag52] |",
+        "| 6        | [tag61, tag62] |",
+        "+----------+----------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    let options = UnnestOptions::new().with_preserve_nulls(true);
+    let results = df
+        .unnest_column_with_options("tags", options)?
+        .collect()
+        .await?;
+    let expected = vec![
+        "+----------+-------+",
+        "| shape_id | tags  |",
+        "+----------+-------+",
+        "| 1        | tag11 |",
+        "| 1        | tag12 |",
+        "| 2        | tag21 |",
+        "| 2        | tag22 |",
+        "| 3        | tag31 |",
+        "| 3        | tag32 |",
+        "| 4        | tag41 |",
+        "| 4        | tag42 |",
+        "| 5        | tag51 |",
+        "| 5        | tag52 |",
+        "| 6        | tag61 |",
+        "| 6        | tag62 |",
+        "+----------+-------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn unnest_aggregate_columns() -> Result<()> {
     const NUM_ROWS: usize = 5;
@@ -1294,6 +1441,40 @@ async fn table_with_nested_types(n: usize) -> 
Result<DataFrame> {
     ctx.table("shapes").await
 }
 
+/// A a data frame that a list of integers and string IDs
+async fn table_with_lists_and_nulls() -> Result<DataFrame> {
+    let mut list_builder = ListBuilder::new(UInt32Builder::new());
+    let mut id_builder = StringBuilder::new();
+
+    // [1, 2],  A
+    list_builder.values().append_value(1);
+    list_builder.values().append_value(2);
+    list_builder.append(true);
+    id_builder.append_value("A");
+
+    // NULL, B
+    list_builder.append(false);
+    id_builder.append_value("B");
+
+    // [],  C
+    list_builder.append(true);
+    id_builder.append_value("C");
+
+    // [3], D
+    list_builder.values().append_value(3);
+    list_builder.append(true);
+    id_builder.append_value("D");
+
+    let batch = RecordBatch::try_from_iter(vec![
+        ("list", Arc::new(list_builder.finish()) as ArrayRef),
+        ("id", Arc::new(id_builder.finish()) as ArrayRef),
+    ])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_batch("shapes", batch)?;
+    ctx.table("shapes").await
+}
+
 pub async fn register_alltypes_tiny_pages_parquet(ctx: &SessionContext) -> 
Result<()> {
     let testdata = parquet_test_data();
     ctx.register_parquet(
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 176459db14..f89be03f79 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -41,6 +41,7 @@ use crate::{
 };
 use arrow::datatypes::{DataType, Schema, SchemaRef};
 use datafusion_common::plan_err;
+use datafusion_common::UnnestOptions;
 use datafusion_common::{
     display::ToStringifiedPlan, Column, DFField, DFSchema, DFSchemaRef, 
DataFusionError,
     FunctionalDependencies, OwnedTableReference, Result, ScalarValue, 
TableReference,
@@ -1036,6 +1037,19 @@ impl LogicalPlanBuilder {
     pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
         Ok(Self::from(unnest(self.plan, column.into())?))
     }
+
+    /// Unnest the given column given [`UnnestOptions`]
+    pub fn unnest_column_with_options(
+        self,
+        column: impl Into<Column>,
+        options: UnnestOptions,
+    ) -> Result<Self> {
+        Ok(Self::from(unnest_with_options(
+            self.plan,
+            column.into(),
+            options,
+        )?))
+    }
 }
 
 /// Creates a schema for a join operation.
@@ -1379,8 +1393,17 @@ impl TableSource for LogicalTableSource {
     }
 }
 
-/// Create an unnest plan.
+/// Create a [`LogicalPlan::Unnest`] plan
 pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
+    unnest_with_options(input, column, UnnestOptions::new())
+}
+
+/// Create a [`LogicalPlan::Unnest`] plan with options
+pub fn unnest_with_options(
+    input: LogicalPlan,
+    column: Column,
+    options: UnnestOptions,
+) -> Result<LogicalPlan> {
     let unnest_field = input.schema().field_from_column(&column)?;
 
     // Extract the type of the nested field in the list.
@@ -1423,6 +1446,7 @@ pub fn unnest(input: LogicalPlan, column: Column) -> 
Result<LogicalPlan> {
         input: Arc::new(input),
         column: unnested_field.qualified_column(),
         schema,
+        options,
     }))
 }
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 0181f4b567..3557745ed3 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -40,6 +40,7 @@ use datafusion_common::tree_node::{
 use datafusion_common::{
     aggregate_functional_dependencies, plan_err, Column, DFField, DFSchema, 
DFSchemaRef,
     DataFusionError, FunctionalDependencies, OwnedTableReference, Result, 
ScalarValue,
+    UnnestOptions,
 };
 // backwards compatibility
 pub use datafusion_common::display::{PlanType, StringifiedPlan, 
ToStringifiedPlan};
@@ -1850,7 +1851,8 @@ pub enum Partitioning {
     DistributeBy(Vec<Expr>),
 }
 
-/// Unnest a column that contains a nested list type.
+/// Unnest a column that contains a nested list type. See
+/// [`UnnestOptions`] for more details.
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct Unnest {
     /// The incoming logical plan
@@ -1859,6 +1861,8 @@ pub struct Unnest {
     pub column: Column,
     /// The output schema, containing the unnested field column.
     pub schema: DFSchemaRef,
+    /// Options
+    pub options: UnnestOptions,
 }
 
 #[cfg(test)]
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8940eeed01..76061194ed 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -988,7 +988,12 @@ pub fn from_plan(
             Ok(plan.clone())
         }
         LogicalPlan::DescribeTable(_) => Ok(plan.clone()),
-        LogicalPlan::Unnest(Unnest { column, schema, .. }) => {
+        LogicalPlan::Unnest(Unnest {
+            column,
+            schema,
+            options,
+            ..
+        }) => {
             // Update schema with unnested column type.
             let input = Arc::new(inputs[0].clone());
             let nested_field = input.schema().field_from_column(column)?;
@@ -1018,6 +1023,7 @@ pub fn from_plan(
                 input,
                 column: column.clone(),
                 schema,
+                options: options.clone(),
             }))
         }
     }

Reply via email to