This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 03f7cc9998 Improve unnest_column performance (#6903)
03f7cc9998 is described below

commit 03f7cc9998bb6b731e5d82547f588e19c439fb86
Author: vincev <[email protected]>
AuthorDate: Thu Jul 13 20:01:24 2023 +0200

    Improve unnest_column performance (#6903)
    
    * Improve unnest performance
    
    * Fix comment.
    
    * Fix docs generation.
    
    * Add comments to describe how `batch_from_indices` works.
    
    * Add comment to `unnest_array`.
    
    * Handle unnest for FixedSizeList.
    
    * Add link to `FixedSizeList` issue for the `take` kernel.
---
 datafusion/core/src/physical_plan/unnest.rs | 273 +++++++++++++++++++++++-----
 datafusion/core/tests/dataframe/mod.rs      |  74 +++++++-
 2 files changed, 302 insertions(+), 45 deletions(-)

diff --git a/datafusion/core/src/physical_plan/unnest.rs 
b/datafusion/core/src/physical_plan/unnest.rs
index 7f8d56847e..7a213dffeb 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -18,12 +18,16 @@
 //! Defines the unnest column plan for unnesting values in a column that 
contains a list
 //! type, conceptually is like joining each row with all the values in the 
list column.
 use arrow::array::{
-    new_null_array, Array, ArrayAccessor, ArrayRef, FixedSizeListArray, 
LargeListArray,
-    ListArray,
+    new_null_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType,
+    FixedSizeListArray, Int32Array, LargeListArray, ListArray, PrimitiveArray,
+};
+use arrow::compute::kernels;
+use arrow::datatypes::{
+    ArrowNativeType, ArrowNativeTypeOp, DataType, Int32Type, Int64Type, 
Schema, SchemaRef,
 };
-use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use datafusion_common::{cast::as_primitive_array, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 use futures::Stream;
 use futures::StreamExt;
@@ -32,11 +36,10 @@ use std::time::Instant;
 use std::{any::Any, sync::Arc};
 
 use crate::physical_plan::{
-    coalesce_batches::concat_batches, expressions::Column, DisplayFormatType,
-    Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, 
PhysicalExpr,
-    PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
+    expressions::Column, DisplayFormatType, Distribution, 
EquivalenceProperties,
+    ExecutionPlan, Partitioning, PhysicalExpr, PhysicalSortExpr, 
RecordBatchStream,
+    SendableRecordBatchStream, Statistics,
 };
-use datafusion_common::{DataFusionError, Result, ScalarValue};
 
 use super::DisplayAs;
 
@@ -231,18 +234,18 @@ fn build_batch(
 ) -> Result<RecordBatch> {
     let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
     match list_array.data_type() {
-        arrow::datatypes::DataType::List(_) => {
+        DataType::List(_) => {
             let list_array = 
list_array.as_any().downcast_ref::<ListArray>().unwrap();
             unnest_batch(batch, schema, column, &list_array)
         }
-        arrow::datatypes::DataType::LargeList(_) => {
+        DataType::LargeList(_) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<LargeListArray>()
                 .unwrap();
             unnest_batch(batch, schema, column, &list_array)
         }
-        arrow::datatypes::DataType::FixedSizeList(_, _) => {
+        DataType::FixedSizeList(_, _) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<FixedSizeListArray>()
@@ -264,41 +267,225 @@ fn unnest_batch<T>(
 where
     T: ArrayAccessor<Item = ArrayRef>,
 {
-    let mut batches = Vec::new();
-    let mut num_rows = 0;
-
-    for row in 0..batch.num_rows() {
-        let arrays = batch
-            .columns()
-            .iter()
-            .enumerate()
-            .map(|(col_idx, arr)| {
-                if col_idx == column.index() {
-                    // Unnest the value at the given row.
-                    if list_array.value(row).is_empty() {
-                        // If nested array is empty add an array with 1 null.
-                        Ok(new_null_array(list_array.value(row).data_type(), 
1))
-                    } else {
-                        Ok(list_array.value(row))
-                    }
+    // Create an array with the unnested values of the list array, given the 
list
+    // array:
+    //
+    //   [1], null, [2, 3, 4], null, [5, 6]
+    //
+    // the result array is:
+    //
+    //   1, null, 2, 3, 4, null, 5, 6
+    //
+    let unnested_array = unnest_array(list_array)?;
+
+    // Create an array with the lengths of each list value in the nested array.
+    // Given the nested array:
+    //
+    //   [1], null, [2, 3, 4], null, [5, 6]
+    //
+    // the result array is:
+    //
+    //   1, null, 3, null, 2
+    //
+    // Depending on the list type the result may be Int32Array or Int64Array.
+    let list_lengths = list_lengths(list_array)?;
+
+    // Create the indices for the take kernel and then use those indices to 
create
+    // the unnested record batch.
+    match list_lengths.data_type() {
+        DataType::Int32 => {
+            let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
+            let indices = create_take_indices(list_lengths, 
unnested_array.len());
+            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
+        }
+        DataType::Int64 => {
+            let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
+            let indices = create_take_indices(list_lengths, 
unnested_array.len());
+            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
+        }
+        dt => Err(DataFusionError::Execution(format!(
+            "Unnest: unsupported indices type {dt}"
+        ))),
+    }
+}
+
+/// Create the indices for the take kernel given an array of list values 
lengths.
+///
+/// The indices are used to duplicate column elements so that all columns have 
as
+/// many rows as the unnested array.
+///
+/// Given the nested array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// the `list_lengths` array contains the length of each list value:
+///
+/// ```ignore
+/// 1, null,  3, null, 2
+/// ```
+///
+/// the result indices array is:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// where a null value count as one element.
+fn create_take_indices<T>(
+    list_lengths: &PrimitiveArray<T>,
+    capacity: usize,
+) -> PrimitiveArray<T>
+where
+    T: ArrowPrimitiveType,
+{
+    let mut builder = PrimitiveArray::<T>::builder(capacity);
+    for row in 0..list_lengths.len() {
+        let repeat = if list_lengths.is_null(row) {
+            T::Native::ONE
+        } else {
+            list_lengths.value(row)
+        };
+
+        // Both `repeat` and `index` are positive intergers.
+        let repeat = repeat.to_usize().unwrap();
+        let index = T::Native::from_usize(row).unwrap();
+        (0..repeat).for_each(|_| builder.append_value(index));
+    }
+
+    builder.finish()
+}
+
+/// Create the final batch given the unnested column array and a `indices` 
array
+/// that is used by the take kernel to copy values.
+///
+/// For example if we have the following `RecordBatch`:
+///
+/// ```ignore
+/// c1: [1], null, [2, 3, 4], null, [5, 6]
+/// c2: 'a', 'b',  'c', null, 'd'
+/// ```
+///
+/// then the `unnested_array` contains the unnest column that will replace 
`c1` in
+/// the final batch:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// ```
+///
+/// And the `indices` array contains the indices that are used by `take` 
kernel to
+/// repeat the values in `c2`:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// so that the final batch will look like:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
+/// ```
+///
+fn batch_from_indices<T>(
+    batch: &RecordBatch,
+    schema: &SchemaRef,
+    unnest_column_idx: usize,
+    unnested_array: &ArrayRef,
+    indices: &PrimitiveArray<T>,
+) -> Result<RecordBatch>
+where
+    T: ArrowPrimitiveType,
+{
+    let arrays = batch
+        .columns()
+        .iter()
+        .enumerate()
+        .map(|(col_idx, arr)| {
+            if col_idx == unnest_column_idx {
+                Ok(unnested_array.clone())
+            } else {
+                Ok(kernels::take::take(&arr, indices, None)?)
+            }
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
+}
+
+/// Unnest the given list array. Given the array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// returns:
+///
+/// ```ignore
+/// 1, null, 2, 3, 4, null, 5, 6
+/// ```
+fn unnest_array<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+    T: ArrayAccessor<Item = ArrayRef>,
+{
+    let elem_type = match list_array.data_type() {
+        DataType::List(f) | DataType::FixedSizeList(f, _) | 
DataType::LargeList(f) => {
+            f.data_type()
+        }
+        dt => {
+            return Err(DataFusionError::Execution(format!(
+                "Cannot unnest array of type {dt}"
+            )))
+        }
+    };
+
+    let null_row = new_null_array(elem_type, 1);
+
+    // Create a vec of ArrayRef from the list elements.
+    let arrays = (0..list_array.len())
+        .map(|row| {
+            if list_array.is_null(row) {
+                null_row.clone()
+            } else {
+                list_array.value(row)
+            }
+        })
+        .collect::<Vec<_>>();
+
+    // Create Vec<&dyn Array> from Vec<Arc<dyn Array>> for `concat`. Calling
+    // `as_ref()` in the `map` above causes the borrow checker to complain.
+    let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
+
+    Ok(kernels::concat::concat(&arrays)?)
+}
+
+/// Returns an array with the lengths of each list in `list_array`. Returns 
null
+/// for a null value.
+fn list_lengths<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+    T: ArrayAccessor<Item = ArrayRef>,
+{
+    match list_array.data_type() {
+        DataType::List(_) | DataType::LargeList(_) => {
+            Ok(kernels::length::length(list_array)?)
+        }
+        DataType::FixedSizeList(_, size) => {
+            // Handle FixedSizeList as it is not handled by the `length` 
kernel.
+            // https://github.com/apache/arrow-rs/issues/4517
+            let mut lengths = Vec::with_capacity(list_array.len());
+            for row in 0..list_array.len() {
+                if list_array.is_null(row) {
+                    lengths.push(None)
                 } else {
-                    // Number of elements to duplicate, use max(1) to handle 
null.
-                    let nested_len = list_array.value(row).len().max(1);
-                    // Duplicate rows for each value in the nested array.
-                    if arr.is_null(row) {
-                        Ok(new_null_array(arr.data_type(), nested_len))
-                    } else {
-                        let scalar = ScalarValue::try_from_array(arr, row)?;
-                        Ok(scalar.to_array_of_size(nested_len))
-                    }
+                    lengths.push(Some(*size));
                 }
-            })
-            .collect::<Result<Vec<_>>>()?;
+            }
 
-        let rb = RecordBatch::try_new(schema.clone(), arrays.to_vec())?;
-        num_rows += rb.num_rows();
-        batches.push(rb);
+            Ok(Arc::new(Int32Array::from(lengths)))
+        }
+        dt => Err(DataFusionError::Execution(format!(
+            "Invalid type {dt} for list_lengths"
+        ))),
     }
-
-    concat_batches(schema, &batches, num_rows).map_err(Into::into)
 }
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 67c0363bf5..aecec35f2e 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -22,8 +22,8 @@ use arrow::datatypes::{DataType, Field, Schema};
 use arrow::util::pretty::pretty_format_batches;
 use arrow::{
     array::{
-        ArrayRef, Int32Array, Int32Builder, ListBuilder, StringArray, 
StringBuilder,
-        StructBuilder, UInt32Array, UInt32Builder,
+        ArrayRef, FixedSizeListBuilder, Int32Array, Int32Builder, ListBuilder,
+        StringArray, StringBuilder, StructBuilder, UInt32Array, UInt32Builder,
     },
     record_batch::RecordBatch,
 };
@@ -1044,6 +1044,76 @@ async fn unnest_columns() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn unnest_fixed_list() -> Result<()> {
+    let mut shape_id_builder = UInt32Builder::new();
+    let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+
+    for idx in 0..6 {
+        // Append shape id.
+        shape_id_builder.append_value(idx as u32 + 1);
+
+        if idx % 3 != 0 {
+            tags_builder
+                .values()
+                .append_value(format!("tag{}1", idx + 1));
+            tags_builder
+                .values()
+                .append_value(format!("tag{}2", idx + 1));
+            tags_builder.append(true);
+        } else {
+            tags_builder.values().append_null();
+            tags_builder.values().append_null();
+            tags_builder.append(false);
+        }
+    }
+
+    let batch = RecordBatch::try_from_iter(vec![
+        ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
+        ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
+    ])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_batch("shapes", batch)?;
+    let df = ctx.table("shapes").await?;
+
+    let results = df.clone().collect().await?;
+    let expected = vec![
+        "+----------+----------------+",
+        "| shape_id | tags           |",
+        "+----------+----------------+",
+        "| 1        |                |",
+        "| 2        | [tag21, tag22] |",
+        "| 3        | [tag31, tag32] |",
+        "| 4        |                |",
+        "| 5        | [tag51, tag52] |",
+        "| 6        | [tag61, tag62] |",
+        "+----------+----------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    let results = df.unnest_column("tags")?.collect().await?;
+    let expected = vec![
+        "+----------+-------+",
+        "| shape_id | tags  |",
+        "+----------+-------+",
+        "| 1        |       |",
+        "| 2        | tag21 |",
+        "| 2        | tag22 |",
+        "| 3        | tag31 |",
+        "| 3        | tag32 |",
+        "| 4        |       |",
+        "| 5        | tag51 |",
+        "| 5        | tag52 |",
+        "| 6        | tag61 |",
+        "| 6        | tag62 |",
+        "+----------+-------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn unnest_aggregate_columns() -> Result<()> {
     const NUM_ROWS: usize = 5;

Reply via email to