This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 03f7cc9998 Improve unnest_column performance (#6903)
03f7cc9998 is described below
commit 03f7cc9998bb6b731e5d82547f588e19c439fb86
Author: vincev <[email protected]>
AuthorDate: Thu Jul 13 20:01:24 2023 +0200
Improve unnest_column performance (#6903)
* Improve unnest performance
* Fix comment.
* Fix docs generation.
* Add comments to describe how `batch_from_indices` works.
* Add comment to `unnest_array`.
* Handle unnest for FixedSizeList.
* Add link to `FixedSizeList` issue for the `take` kernel.
---
datafusion/core/src/physical_plan/unnest.rs | 273 +++++++++++++++++++++++-----
datafusion/core/tests/dataframe/mod.rs | 74 +++++++-
2 files changed, 302 insertions(+), 45 deletions(-)
diff --git a/datafusion/core/src/physical_plan/unnest.rs
b/datafusion/core/src/physical_plan/unnest.rs
index 7f8d56847e..7a213dffeb 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -18,12 +18,16 @@
//! Defines the unnest column plan for unnesting values in a column that
contains a list
//! type, conceptually is like joining each row with all the values in the
list column.
use arrow::array::{
- new_null_array, Array, ArrayAccessor, ArrayRef, FixedSizeListArray,
LargeListArray,
- ListArray,
+ new_null_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType,
+ FixedSizeListArray, Int32Array, LargeListArray, ListArray, PrimitiveArray,
+};
+use arrow::compute::kernels;
+use arrow::datatypes::{
+ ArrowNativeType, ArrowNativeTypeOp, DataType, Int32Type, Int64Type,
Schema, SchemaRef,
};
-use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
+use datafusion_common::{cast::as_primitive_array, DataFusionError, Result};
use datafusion_execution::TaskContext;
use futures::Stream;
use futures::StreamExt;
@@ -32,11 +36,10 @@ use std::time::Instant;
use std::{any::Any, sync::Arc};
use crate::physical_plan::{
- coalesce_batches::concat_batches, expressions::Column, DisplayFormatType,
- Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
PhysicalExpr,
- PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
+ expressions::Column, DisplayFormatType, Distribution,
EquivalenceProperties,
+ ExecutionPlan, Partitioning, PhysicalExpr, PhysicalSortExpr,
RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
};
-use datafusion_common::{DataFusionError, Result, ScalarValue};
use super::DisplayAs;
@@ -231,18 +234,18 @@ fn build_batch(
) -> Result<RecordBatch> {
let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
match list_array.data_type() {
- arrow::datatypes::DataType::List(_) => {
+ DataType::List(_) => {
let list_array =
list_array.as_any().downcast_ref::<ListArray>().unwrap();
unnest_batch(batch, schema, column, &list_array)
}
- arrow::datatypes::DataType::LargeList(_) => {
+ DataType::LargeList(_) => {
let list_array = list_array
.as_any()
.downcast_ref::<LargeListArray>()
.unwrap();
unnest_batch(batch, schema, column, &list_array)
}
- arrow::datatypes::DataType::FixedSizeList(_, _) => {
+ DataType::FixedSizeList(_, _) => {
let list_array = list_array
.as_any()
.downcast_ref::<FixedSizeListArray>()
@@ -264,41 +267,225 @@ fn unnest_batch<T>(
where
T: ArrayAccessor<Item = ArrayRef>,
{
- let mut batches = Vec::new();
- let mut num_rows = 0;
-
- for row in 0..batch.num_rows() {
- let arrays = batch
- .columns()
- .iter()
- .enumerate()
- .map(|(col_idx, arr)| {
- if col_idx == column.index() {
- // Unnest the value at the given row.
- if list_array.value(row).is_empty() {
- // If nested array is empty add an array with 1 null.
- Ok(new_null_array(list_array.value(row).data_type(),
1))
- } else {
- Ok(list_array.value(row))
- }
+ // Create an array with the unnested values of the list array, given the
list
+ // array:
+ //
+ // [1], null, [2, 3, 4], null, [5, 6]
+ //
+ // the result array is:
+ //
+ // 1, null, 2, 3, 4, null, 5, 6
+ //
+ let unnested_array = unnest_array(list_array)?;
+
+ // Create an array with the lengths of each list value in the nested array.
+ // Given the nested array:
+ //
+ // [1], null, [2, 3, 4], null, [5, 6]
+ //
+ // the result array is:
+ //
+ // 1, null, 3, null, 2
+ //
+ // Depending on the list type the result may be Int32Array or Int64Array.
+ let list_lengths = list_lengths(list_array)?;
+
+ // Create the indices for the take kernel and then use those indices to
create
+ // the unnested record batch.
+ match list_lengths.data_type() {
+ DataType::Int32 => {
+ let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
+ let indices = create_take_indices(list_lengths,
unnested_array.len());
+ batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
+ }
+ DataType::Int64 => {
+ let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
+ let indices = create_take_indices(list_lengths,
unnested_array.len());
+ batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
+ }
+ dt => Err(DataFusionError::Execution(format!(
+ "Unnest: unsupported indices type {dt}"
+ ))),
+ }
+}
+
+/// Create the indices for the take kernel given an array of list values
lengths.
+///
+/// The indices are used to duplicate column elements so that all columns have
as
+/// many rows as the unnested array.
+///
+/// Given the nested array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// the `list_lengths` array contains the length of each list value:
+///
+/// ```ignore
+/// 1, null, 3, null, 2
+/// ```
+///
+/// the result indices array is:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// where a null value count as one element.
+fn create_take_indices<T>(
+ list_lengths: &PrimitiveArray<T>,
+ capacity: usize,
+) -> PrimitiveArray<T>
+where
+ T: ArrowPrimitiveType,
+{
+ let mut builder = PrimitiveArray::<T>::builder(capacity);
+ for row in 0..list_lengths.len() {
+ let repeat = if list_lengths.is_null(row) {
+ T::Native::ONE
+ } else {
+ list_lengths.value(row)
+ };
+
+ // Both `repeat` and `index` are positive intergers.
+ let repeat = repeat.to_usize().unwrap();
+ let index = T::Native::from_usize(row).unwrap();
+ (0..repeat).for_each(|_| builder.append_value(index));
+ }
+
+ builder.finish()
+}
+
+/// Create the final batch given the unnested column array and a `indices`
array
+/// that is used by the take kernel to copy values.
+///
+/// For example if we have the following `RecordBatch`:
+///
+/// ```ignore
+/// c1: [1], null, [2, 3, 4], null, [5, 6]
+/// c2: 'a', 'b', 'c', null, 'd'
+/// ```
+///
+/// then the `unnested_array` contains the unnest column that will replace
`c1` in
+/// the final batch:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// ```
+///
+/// And the `indices` array contains the indices that are used by `take`
kernel to
+/// repeat the values in `c2`:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// so that the final batch will look like:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
+/// ```
+///
+fn batch_from_indices<T>(
+ batch: &RecordBatch,
+ schema: &SchemaRef,
+ unnest_column_idx: usize,
+ unnested_array: &ArrayRef,
+ indices: &PrimitiveArray<T>,
+) -> Result<RecordBatch>
+where
+ T: ArrowPrimitiveType,
+{
+ let arrays = batch
+ .columns()
+ .iter()
+ .enumerate()
+ .map(|(col_idx, arr)| {
+ if col_idx == unnest_column_idx {
+ Ok(unnested_array.clone())
+ } else {
+ Ok(kernels::take::take(&arr, indices, None)?)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
+}
+
+/// Unnest the given list array. Given the array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// returns:
+///
+/// ```ignore
+/// 1, null, 2, 3, 4, null, 5, 6
+/// ```
+fn unnest_array<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+ T: ArrayAccessor<Item = ArrayRef>,
+{
+ let elem_type = match list_array.data_type() {
+ DataType::List(f) | DataType::FixedSizeList(f, _) |
DataType::LargeList(f) => {
+ f.data_type()
+ }
+ dt => {
+ return Err(DataFusionError::Execution(format!(
+ "Cannot unnest array of type {dt}"
+ )))
+ }
+ };
+
+ let null_row = new_null_array(elem_type, 1);
+
+ // Create a vec of ArrayRef from the list elements.
+ let arrays = (0..list_array.len())
+ .map(|row| {
+ if list_array.is_null(row) {
+ null_row.clone()
+ } else {
+ list_array.value(row)
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Create Vec<&dyn Array> from Vec<Arc<dyn Array>> for `concat`. Calling
+ // `as_ref()` in the `map` above causes the borrow checker to complain.
+ let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
+
+ Ok(kernels::concat::concat(&arrays)?)
+}
+
+/// Returns an array with the lengths of each list in `list_array`. Returns
null
+/// for a null value.
+fn list_lengths<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+ T: ArrayAccessor<Item = ArrayRef>,
+{
+ match list_array.data_type() {
+ DataType::List(_) | DataType::LargeList(_) => {
+ Ok(kernels::length::length(list_array)?)
+ }
+ DataType::FixedSizeList(_, size) => {
+ // Handle FixedSizeList as it is not handled by the `length`
kernel.
+ // https://github.com/apache/arrow-rs/issues/4517
+ let mut lengths = Vec::with_capacity(list_array.len());
+ for row in 0..list_array.len() {
+ if list_array.is_null(row) {
+ lengths.push(None)
} else {
- // Number of elements to duplicate, use max(1) to handle
null.
- let nested_len = list_array.value(row).len().max(1);
- // Duplicate rows for each value in the nested array.
- if arr.is_null(row) {
- Ok(new_null_array(arr.data_type(), nested_len))
- } else {
- let scalar = ScalarValue::try_from_array(arr, row)?;
- Ok(scalar.to_array_of_size(nested_len))
- }
+ lengths.push(Some(*size));
}
- })
- .collect::<Result<Vec<_>>>()?;
+ }
- let rb = RecordBatch::try_new(schema.clone(), arrays.to_vec())?;
- num_rows += rb.num_rows();
- batches.push(rb);
+ Ok(Arc::new(Int32Array::from(lengths)))
+ }
+ dt => Err(DataFusionError::Execution(format!(
+ "Invalid type {dt} for list_lengths"
+ ))),
}
-
- concat_batches(schema, &batches, num_rows).map_err(Into::into)
}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index 67c0363bf5..aecec35f2e 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -22,8 +22,8 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty::pretty_format_batches;
use arrow::{
array::{
- ArrayRef, Int32Array, Int32Builder, ListBuilder, StringArray,
StringBuilder,
- StructBuilder, UInt32Array, UInt32Builder,
+ ArrayRef, FixedSizeListBuilder, Int32Array, Int32Builder, ListBuilder,
+ StringArray, StringBuilder, StructBuilder, UInt32Array, UInt32Builder,
},
record_batch::RecordBatch,
};
@@ -1044,6 +1044,76 @@ async fn unnest_columns() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn unnest_fixed_list() -> Result<()> {
+ let mut shape_id_builder = UInt32Builder::new();
+ let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+
+ for idx in 0..6 {
+ // Append shape id.
+ shape_id_builder.append_value(idx as u32 + 1);
+
+ if idx % 3 != 0 {
+ tags_builder
+ .values()
+ .append_value(format!("tag{}1", idx + 1));
+ tags_builder
+ .values()
+ .append_value(format!("tag{}2", idx + 1));
+ tags_builder.append(true);
+ } else {
+ tags_builder.values().append_null();
+ tags_builder.values().append_null();
+ tags_builder.append(false);
+ }
+ }
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
+ ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
+ ])?;
+
+ let ctx = SessionContext::new();
+ ctx.register_batch("shapes", batch)?;
+ let df = ctx.table("shapes").await?;
+
+ let results = df.clone().collect().await?;
+ let expected = vec![
+ "+----------+----------------+",
+ "| shape_id | tags |",
+ "+----------+----------------+",
+ "| 1 | |",
+ "| 2 | [tag21, tag22] |",
+ "| 3 | [tag31, tag32] |",
+ "| 4 | |",
+ "| 5 | [tag51, tag52] |",
+ "| 6 | [tag61, tag62] |",
+ "+----------+----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ let results = df.unnest_column("tags")?.collect().await?;
+ let expected = vec![
+ "+----------+-------+",
+ "| shape_id | tags |",
+ "+----------+-------+",
+ "| 1 | |",
+ "| 2 | tag21 |",
+ "| 2 | tag22 |",
+ "| 3 | tag31 |",
+ "| 3 | tag32 |",
+ "| 4 | |",
+ "| 5 | tag51 |",
+ "| 5 | tag52 |",
+ "| 6 | tag61 |",
+ "| 6 | tag62 |",
+ "+----------+-------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
#[tokio::test]
async fn unnest_aggregate_columns() -> Result<()> {
const NUM_ROWS: usize = 5;