This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8bbebe21c9 Add API to make `unnest` consistent with DuckDB/ClickHouse,
add option for preserve_nulls, update docs (#7168)
8bbebe21c9 is described below
commit 8bbebe21c93b1a5d54e8a21de68f0e17a3aeb0f8
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 10 08:20:19 2023 -0500
Add API to make `unnest` consistent with DuckDB/ClickHouse, add option for
preserve_nulls, update docs (#7168)
* Make unnest consistent with DuckDB/ClickHouse, add option for
preserve_nulls, update docs
* fix typo
* commit file
* Update datafusion/core/src/dataframe.rs
* Fix logic
* fix doc test
---
datafusion/common/src/lib.rs | 2 +
datafusion/common/src/unnest.rs | 90 ++++++++++++++
datafusion/core/src/dataframe.rs | 22 +++-
datafusion/core/src/physical_plan/unnest.rs | 23 +++-
datafusion/core/src/physical_planner.rs | 4 +-
datafusion/core/tests/dataframe/mod.rs | 183 +++++++++++++++++++++++++++-
datafusion/expr/src/logical_plan/builder.rs | 26 +++-
datafusion/expr/src/logical_plan/plan.rs | 6 +-
datafusion/expr/src/utils.rs | 8 +-
9 files changed, 354 insertions(+), 10 deletions(-)
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index b831d3d0ca..50875f6a59 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -35,6 +35,7 @@ pub mod stats;
mod table_reference;
pub mod test_util;
pub mod tree_node;
+mod unnest;
pub mod utils;
pub use column::Column;
@@ -52,6 +53,7 @@ pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference,
TableReference};
+pub use unnest::UnnestOptions;
pub use utils::project_schema;
/// Downcast an Arrow Array to a concrete type, return an
`DataFusionError::Internal` if the cast is
diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs
new file mode 100644
index 0000000000..fd92267f9b
--- /dev/null
+++ b/datafusion/common/src/unnest.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`UnnestOptions`] for unnesting structured types
+
+/// Options for unnesting a column that contains a list type,
+/// replicating values in the other, non nested rows.
+///
+/// Conceptually this operation is like joining each row with all the
+/// values in the list column.
+///
+/// If `preserve_nulls` is false, nulls and empty lists
+/// from the input column are not carried through to the output. This
+/// is the default behavior for other systems such as ClickHouse and
+/// DuckDB
+///
+/// If `preserve_nulls` is true (the default), nulls from the input
+/// column are carried through to the output.
+///
+/// # Examples
+///
+/// ## `Unnest(c1)`, preserve_nulls: false
+/// ```text
+/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐
+/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │
+/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤
+/// │ null │ │ B │ │ 2 │ │ A │
+/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤
+/// │ {} │ │ D │ │ 3 │ │ E │
+/// ├─────────┤ ├─────┤ └─────────┘ └─────┘
+/// │ {3} │ │ E │ c1 c2
+/// └─────────┘ └─────┘
+/// c1 c2
+/// ```
+///
+/// ## `Unnest(c1)`, preserve_nulls: true
+/// ```text
+/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐
+/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │
+/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤
+/// │ null │ │ B │ │ 2 │ │ A │
+/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤
+/// │ {} │ │ D │ │ null │ │ B │
+/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤
+/// │ {3} │ │ E │ │ 3 │ │ E │
+/// └─────────┘ └─────┘ └─────────┘ └─────┘
+/// c1 c2 c1 c2
+/// ```
+#[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)]
+pub struct UnnestOptions {
+ /// Should nulls in the input be preserved? Defaults to true
+ pub preserve_nulls: bool,
+}
+
+impl Default for UnnestOptions {
+ fn default() -> Self {
+ Self {
+ // default to true to maintain backwards compatible behavior
+ preserve_nulls: true,
+ }
+ }
+}
+
+impl UnnestOptions {
+ /// Create a new [`UnnestOptions`] with default values
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Set the behavior with nulls in the input as described on
+ /// [`Self`]
+ pub fn with_preserve_nulls(mut self, preserve_nulls: bool) -> Self {
+ self.preserve_nulls = preserve_nulls;
+ self
+ }
+}
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 8d4ad6cc50..5b1983f567 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -24,7 +24,7 @@ use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
-use datafusion_common::{DataFusionError, SchemaError};
+use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
use parquet::file::properties::WriterProperties;
use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -178,6 +178,11 @@ impl DataFrame {
/// Expand each list element of a column to multiple rows.
///
+ /// Seee also:
+ ///
+ /// 1. [`UnnestOptions`] documentation for the behavior of `unnest`
+ /// 2. [`Self::unnest_column_with_options`]
+ ///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
@@ -190,8 +195,21 @@ impl DataFrame {
/// # }
/// ```
pub fn unnest_column(self, column: &str) -> Result<DataFrame> {
+ self.unnest_column_with_options(column, UnnestOptions::new())
+ }
+
+ /// Expand each list element of a column to multiple rows, with
+ /// behavior controlled by [`UnnestOptions`].
+ ///
+ /// Please see the documentation on [`UnnestOptions`] for more
+ /// details about the meaning of unnest.
+ pub fn unnest_column_with_options(
+ self,
+ column: &str,
+ options: UnnestOptions,
+ ) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
- .unnest_column(column)?
+ .unnest_column_with_options(column, options)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
}
diff --git a/datafusion/core/src/physical_plan/unnest.rs
b/datafusion/core/src/physical_plan/unnest.rs
index 7a213dffeb..b022cf751f 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -27,6 +27,7 @@ use arrow::datatypes::{
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
+use datafusion_common::UnnestOptions;
use datafusion_common::{cast::as_primitive_array, DataFusionError, Result};
use datafusion_execution::TaskContext;
use futures::Stream;
@@ -43,7 +44,10 @@ use crate::physical_plan::{
use super::DisplayAs;
-/// Unnest the given column by joining the row with each value in the nested
type.
+/// Unnest the given column by joining the row with each value in the
+/// nested type.
+///
+/// See [`UnnestOptions`] for more details and an example.
#[derive(Debug)]
pub struct UnnestExec {
/// Input execution plan
@@ -52,15 +56,23 @@ pub struct UnnestExec {
schema: SchemaRef,
/// The unnest column
column: Column,
+ /// Options
+ options: UnnestOptions,
}
impl UnnestExec {
/// Create a new [UnnestExec].
- pub fn new(input: Arc<dyn ExecutionPlan>, column: Column, schema:
SchemaRef) -> Self {
+ pub fn new(
+ input: Arc<dyn ExecutionPlan>,
+ column: Column,
+ schema: SchemaRef,
+ options: UnnestOptions,
+ ) -> Self {
UnnestExec {
input,
schema,
column,
+ options,
}
}
}
@@ -107,6 +119,7 @@ impl ExecutionPlan for UnnestExec {
children[0].clone(),
self.column.clone(),
self.schema.clone(),
+ self.options.clone(),
)))
}
@@ -133,6 +146,12 @@ impl ExecutionPlan for UnnestExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
+ if !self.options.preserve_nulls {
+ return Err(DataFusionError::NotImplemented(
+ "Unnest with preserve_nulls=false".to_string(),
+ ));
+ }
+
Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 172304165a..6b868b9b24 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1164,12 +1164,12 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
- LogicalPlan::Unnest(Unnest { input, column, schema }) => {
+ LogicalPlan::Unnest(Unnest { input, column, schema, options })
=> {
let input = self.create_initial_plan(input,
session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
let schema =
SchemaRef::new(schema.as_ref().to_owned().into());
- Ok(Arc::new(UnnestExec::new(input, column_exec, schema)))
+ Ok(Arc::new(UnnestExec::new(input, column_exec, schema,
options.clone())))
}
LogicalPlan::Ddl(ddl) => {
// There is no default plan for DDl statements --
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index e382176525..bfdb2bda1b 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -37,7 +37,7 @@ use datafusion::prelude::JoinType;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion::test_util::parquet_test_data;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::Expr::Wildcard;
@@ -1044,6 +1044,82 @@ async fn unnest_columns() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn unnest_column_preserve_nulls_not_supported() -> Result<()> {
+ // Unnest, preserving nulls not yet supported
+ let options = UnnestOptions::new().with_preserve_nulls(false);
+
+ let results = table_with_lists_and_nulls()
+ .await?
+ .clone()
+ .unnest_column_with_options("list", options)?
+ .collect()
+ .await;
+
+ assert_eq!(
+ results.unwrap_err().to_string(),
+ "This feature is not implemented: Unnest with preserve_nulls=false"
+ );
+ Ok(())
+}
+#[tokio::test]
+#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
+async fn unnest_column_nulls() -> Result<()> {
+ let df = table_with_lists_and_nulls().await?;
+ let results = df.clone().collect().await?;
+ let expected = vec![
+ "+--------+----+",
+ "| list | id |",
+ "+--------+----+",
+ "| [1, 2] | A |",
+ "| | B |",
+ "| [] | C |",
+ "| [3] | D |",
+ "+--------+----+",
+ ];
+ assert_batches_eq!(expected, &results);
+
+ // Unnest, preserving nulls (row with B is preserved)
+ let options = UnnestOptions::new().with_preserve_nulls(true);
+
+ let results = df
+ .clone()
+ .unnest_column_with_options("list", options)?
+ .collect()
+ .await?;
+ let expected = vec![
+ "+------+----+",
+ "| list | id |",
+ "+------+----+",
+ "| 1 | A |",
+ "| 2 | A |",
+ "| | B |",
+ "| 3 | D |",
+ "+------+----+",
+ ];
+ assert_batches_eq!(expected, &results);
+
+ // NOTE this is incorrect,
+ let options = UnnestOptions::new().with_preserve_nulls(false);
+ let results = df
+ .unnest_column_with_options("list", options)?
+ .collect()
+ .await?;
+ let expected = vec![
+ "+------+----+",
+ "| list | id |",
+ "+------+----+",
+ "| 1 | A |",
+ "| 2 | A |",
+ "| | B |", // this row should not be here
+ "| 3 | D |",
+ "+------+----+",
+ ];
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
+
#[tokio::test]
async fn unnest_fixed_list() -> Result<()> {
let mut shape_id_builder = UInt32Builder::new();
@@ -1114,6 +1190,77 @@ async fn unnest_fixed_list() -> Result<()> {
Ok(())
}
+#[tokio::test]
+#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
+async fn unnest_fixed_list_nonull() -> Result<()> {
+ let mut shape_id_builder = UInt32Builder::new();
+ let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+
+ for idx in 0..6 {
+ // Append shape id.
+ shape_id_builder.append_value(idx as u32 + 1);
+
+ tags_builder
+ .values()
+ .append_value(format!("tag{}1", idx + 1));
+ tags_builder
+ .values()
+ .append_value(format!("tag{}2", idx + 1));
+ tags_builder.append(true);
+ }
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
+ ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
+ ])?;
+
+ let ctx = SessionContext::new();
+ ctx.register_batch("shapes", batch)?;
+ let df = ctx.table("shapes").await?;
+
+ let results = df.clone().collect().await?;
+ let expected = vec![
+ "+----------+----------------+",
+ "| shape_id | tags |",
+ "+----------+----------------+",
+ "| 1 | [tag11, tag12] |",
+ "| 2 | [tag21, tag22] |",
+ "| 3 | [tag31, tag32] |",
+ "| 4 | [tag41, tag42] |",
+ "| 5 | [tag51, tag52] |",
+ "| 6 | [tag61, tag62] |",
+ "+----------+----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ let options = UnnestOptions::new().with_preserve_nulls(true);
+ let results = df
+ .unnest_column_with_options("tags", options)?
+ .collect()
+ .await?;
+ let expected = vec![
+ "+----------+-------+",
+ "| shape_id | tags |",
+ "+----------+-------+",
+ "| 1 | tag11 |",
+ "| 1 | tag12 |",
+ "| 2 | tag21 |",
+ "| 2 | tag22 |",
+ "| 3 | tag31 |",
+ "| 3 | tag32 |",
+ "| 4 | tag41 |",
+ "| 4 | tag42 |",
+ "| 5 | tag51 |",
+ "| 5 | tag52 |",
+ "| 6 | tag61 |",
+ "| 6 | tag62 |",
+ "+----------+-------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
#[tokio::test]
async fn unnest_aggregate_columns() -> Result<()> {
const NUM_ROWS: usize = 5;
@@ -1294,6 +1441,40 @@ async fn table_with_nested_types(n: usize) ->
Result<DataFrame> {
ctx.table("shapes").await
}
+/// A a data frame that a list of integers and string IDs
+async fn table_with_lists_and_nulls() -> Result<DataFrame> {
+ let mut list_builder = ListBuilder::new(UInt32Builder::new());
+ let mut id_builder = StringBuilder::new();
+
+ // [1, 2], A
+ list_builder.values().append_value(1);
+ list_builder.values().append_value(2);
+ list_builder.append(true);
+ id_builder.append_value("A");
+
+ // NULL, B
+ list_builder.append(false);
+ id_builder.append_value("B");
+
+ // [], C
+ list_builder.append(true);
+ id_builder.append_value("C");
+
+ // [3], D
+ list_builder.values().append_value(3);
+ list_builder.append(true);
+ id_builder.append_value("D");
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("list", Arc::new(list_builder.finish()) as ArrayRef),
+ ("id", Arc::new(id_builder.finish()) as ArrayRef),
+ ])?;
+
+ let ctx = SessionContext::new();
+ ctx.register_batch("shapes", batch)?;
+ ctx.table("shapes").await
+}
+
pub async fn register_alltypes_tiny_pages_parquet(ctx: &SessionContext) ->
Result<()> {
let testdata = parquet_test_data();
ctx.register_parquet(
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 176459db14..f89be03f79 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -41,6 +41,7 @@ use crate::{
};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::plan_err;
+use datafusion_common::UnnestOptions;
use datafusion_common::{
display::ToStringifiedPlan, Column, DFField, DFSchema, DFSchemaRef,
DataFusionError,
FunctionalDependencies, OwnedTableReference, Result, ScalarValue,
TableReference,
@@ -1036,6 +1037,19 @@ impl LogicalPlanBuilder {
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
Ok(Self::from(unnest(self.plan, column.into())?))
}
+
+ /// Unnest the given column given [`UnnestOptions`]
+ pub fn unnest_column_with_options(
+ self,
+ column: impl Into<Column>,
+ options: UnnestOptions,
+ ) -> Result<Self> {
+ Ok(Self::from(unnest_with_options(
+ self.plan,
+ column.into(),
+ options,
+ )?))
+ }
}
/// Creates a schema for a join operation.
@@ -1379,8 +1393,17 @@ impl TableSource for LogicalTableSource {
}
}
-/// Create an unnest plan.
+/// Create a [`LogicalPlan::Unnest`] plan
pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
+ unnest_with_options(input, column, UnnestOptions::new())
+}
+
+/// Create a [`LogicalPlan::Unnest`] plan with options
+pub fn unnest_with_options(
+ input: LogicalPlan,
+ column: Column,
+ options: UnnestOptions,
+) -> Result<LogicalPlan> {
let unnest_field = input.schema().field_from_column(&column)?;
// Extract the type of the nested field in the list.
@@ -1423,6 +1446,7 @@ pub fn unnest(input: LogicalPlan, column: Column) ->
Result<LogicalPlan> {
input: Arc::new(input),
column: unnested_field.qualified_column(),
schema,
+ options,
}))
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 0181f4b567..3557745ed3 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -40,6 +40,7 @@ use datafusion_common::tree_node::{
use datafusion_common::{
aggregate_functional_dependencies, plan_err, Column, DFField, DFSchema,
DFSchemaRef,
DataFusionError, FunctionalDependencies, OwnedTableReference, Result,
ScalarValue,
+ UnnestOptions,
};
// backwards compatibility
pub use datafusion_common::display::{PlanType, StringifiedPlan,
ToStringifiedPlan};
@@ -1850,7 +1851,8 @@ pub enum Partitioning {
DistributeBy(Vec<Expr>),
}
-/// Unnest a column that contains a nested list type.
+/// Unnest a column that contains a nested list type. See
+/// [`UnnestOptions`] for more details.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Unnest {
/// The incoming logical plan
@@ -1859,6 +1861,8 @@ pub struct Unnest {
pub column: Column,
/// The output schema, containing the unnested field column.
pub schema: DFSchemaRef,
+ /// Options
+ pub options: UnnestOptions,
}
#[cfg(test)]
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8940eeed01..76061194ed 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -988,7 +988,12 @@ pub fn from_plan(
Ok(plan.clone())
}
LogicalPlan::DescribeTable(_) => Ok(plan.clone()),
- LogicalPlan::Unnest(Unnest { column, schema, .. }) => {
+ LogicalPlan::Unnest(Unnest {
+ column,
+ schema,
+ options,
+ ..
+ }) => {
// Update schema with unnested column type.
let input = Arc::new(inputs[0].clone());
let nested_field = input.schema().field_from_column(column)?;
@@ -1018,6 +1023,7 @@ pub fn from_plan(
input,
column: column.clone(),
schema,
+ options: options.clone(),
}))
}
}