alamb commented on code in PR #7371:
URL: https://github.com/apache/arrow-datafusion/pull/7371#discussion_r1303372708


##########
datafusion/core/src/physical_plan/unnest.rs:
##########
@@ -250,126 +249,266 @@ fn build_batch(
     batch: &RecordBatch,
     schema: &SchemaRef,
     column: &Column,
+    options: &UnnestOptions,
 ) -> Result<RecordBatch> {
     let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
     match list_array.data_type() {
         DataType::List(_) => {
             let list_array = 
list_array.as_any().downcast_ref::<ListArray>().unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i32, Int32Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::LargeList(_) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<LargeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i64, Int64Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::FixedSizeList(_, _) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<FixedSizeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, list_array)
+            build_batch_fixedsize_list(batch, schema, column.index(), 
list_array, options)
         }
         _ => Err(DataFusionError::Execution(format!(
             "Invalid unnest column {column}"
         ))),
     }
 }
 
-fn unnest_batch<T>(
+fn build_batch_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = 
T>>(
     batch: &RecordBatch,
     schema: &SchemaRef,
-    column: &Column,
-    list_array: &T,
-) -> Result<RecordBatch>
-where
-    T: ArrayAccessor<Item = ArrayRef>,
-{
-    // Create an array with the unnested values of the list array, given the 
list
-    // array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 2, 3, 4, null, 5, 6
-    //
-    let unnested_array = unnest_array(list_array)?;
-
-    // Create an array with the lengths of each list value in the nested array.
-    // Given the nested array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 3, null, 2
-    //
-    // Depending on the list type the result may be Int32Array or Int64Array.
-    let list_lengths = kernels::length::length(list_array)?;
-
-    // Create the indices for the take kernel and then use those indices to 
create
-    // the unnested record batch.
-    match list_lengths.data_type() {
-        DataType::Int32 => {
-            let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        DataType::Int64 => {
-            let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        dt => Err(DataFusionError::Execution(format!(
-            "Unnest: unsupported indices type {dt}"
-        ))),
-    }
+    unnest_column_idx: usize,
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<RecordBatch> {
+    let unnested_array = unnest_generic_list::<T, P>(list_array, options)?;
+
+    let take_indicies =
+        create_take_indicies_generic::<T, P>(list_array, unnested_array.len(), 
options);
+
+    batch_from_indices(
+        batch,
+        schema,
+        unnest_column_idx,
+        &unnested_array,
+        &take_indicies,
+    )
 }
 
-/// Create the indices for the take kernel given an array of list values 
lengths.
+/// Given this `GenericList` list_array:
+///   
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
 ///
-/// The indices are used to duplicate column elements so that all columns have 
as
-/// many rows as the unnested array.
+/// ```ignore
+/// [1, 2, 3, 4, 5, 6]
+/// ```
+///
+/// So if there are no null values or `UnnestOptions.preserve_nulls` is false
+/// we can return the values array without any copying.
 ///
-/// Given the nested array:
+/// Otherwise we'll transfrom the values array using the take kernel and the 
following take indicies:
 ///
 /// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// 0, null, 1, 2, 3, null, 4, 5
 /// ```
 ///
-/// the `list_lengths` array contains the length of each list value:
+fn unnest_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = T>>(
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+    let values = list_array.values();
+    if list_array.null_count() == 0 || !options.preserve_nulls {
+        Ok(values.clone())

Review Comment:
   ❤️ 



##########
datafusion/core/src/physical_plan/unnest.rs:
##########
@@ -250,126 +249,266 @@ fn build_batch(
     batch: &RecordBatch,
     schema: &SchemaRef,
     column: &Column,
+    options: &UnnestOptions,
 ) -> Result<RecordBatch> {
     let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
     match list_array.data_type() {
         DataType::List(_) => {
             let list_array = 
list_array.as_any().downcast_ref::<ListArray>().unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i32, Int32Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::LargeList(_) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<LargeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i64, Int64Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::FixedSizeList(_, _) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<FixedSizeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, list_array)
+            build_batch_fixedsize_list(batch, schema, column.index(), 
list_array, options)
         }
         _ => Err(DataFusionError::Execution(format!(
             "Invalid unnest column {column}"
         ))),
     }
 }
 
-fn unnest_batch<T>(
+fn build_batch_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = 
T>>(
     batch: &RecordBatch,
     schema: &SchemaRef,
-    column: &Column,
-    list_array: &T,
-) -> Result<RecordBatch>
-where
-    T: ArrayAccessor<Item = ArrayRef>,
-{
-    // Create an array with the unnested values of the list array, given the 
list
-    // array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 2, 3, 4, null, 5, 6
-    //
-    let unnested_array = unnest_array(list_array)?;
-
-    // Create an array with the lengths of each list value in the nested array.
-    // Given the nested array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 3, null, 2
-    //
-    // Depending on the list type the result may be Int32Array or Int64Array.
-    let list_lengths = kernels::length::length(list_array)?;
-
-    // Create the indices for the take kernel and then use those indices to 
create
-    // the unnested record batch.
-    match list_lengths.data_type() {
-        DataType::Int32 => {
-            let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        DataType::Int64 => {
-            let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        dt => Err(DataFusionError::Execution(format!(
-            "Unnest: unsupported indices type {dt}"
-        ))),
-    }
+    unnest_column_idx: usize,
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<RecordBatch> {
+    let unnested_array = unnest_generic_list::<T, P>(list_array, options)?;
+
+    let take_indicies =
+        create_take_indicies_generic::<T, P>(list_array, unnested_array.len(), 
options);
+
+    batch_from_indices(
+        batch,
+        schema,
+        unnest_column_idx,
+        &unnested_array,
+        &take_indicies,
+    )
 }
 
-/// Create the indices for the take kernel given an array of list values 
lengths.
+/// Given this `GenericList` list_array:
+///   
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
 ///
-/// The indices are used to duplicate column elements so that all columns have 
as
-/// many rows as the unnested array.
+/// ```ignore
+/// [1, 2, 3, 4, 5, 6]
+/// ```
+///
+/// So if there are no null values or `UnnestOptions.preserve_nulls` is false
+/// we can return the values array without any copying.
 ///
-/// Given the nested array:
+/// Otherwise we'll transfrom the values array using the take kernel and the 
following take indicies:
 ///
 /// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// 0, null, 1, 2, 3, null, 4, 5
 /// ```
 ///
-/// the `list_lengths` array contains the length of each list value:
+fn unnest_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = T>>(
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+    let values = list_array.values();
+    if list_array.null_count() == 0 || !options.preserve_nulls {
+        Ok(values.clone())
+    } else {
+        let mut take_indicies_builder =
+            PrimitiveArray::<P>::builder(values.len() + 
list_array.null_count());
+        let mut take_offset = 0;
+
+        list_array.iter().for_each(|elem| match elem {
+            Some(array) => {
+                for i in 0..array.len() {
+                    // take_offset + i is always positive
+                    let take_index = P::Native::from_usize(take_offset + 
i).unwrap();
+                    take_indicies_builder.append_value(take_index);
+                }
+                take_offset += array.len();
+            }
+            None => {
+                take_indicies_builder.append_null();
+            }
+        });
+        Ok(kernels::take::take(
+            &values,
+            &take_indicies_builder.finish(),
+            None,
+        )?)
+    }
+}
+
+fn build_batch_fixedsize_list(
+    batch: &RecordBatch,
+    schema: &SchemaRef,
+    unnest_column_idx: usize,
+    list_array: &FixedSizeListArray,
+    options: &UnnestOptions,
+) -> Result<RecordBatch> {
+    let unnested_array = unnest_fixed_list(list_array, options)?;
+
+    let take_indicies =
+        create_take_indicies_fixed(list_array, unnested_array.len(), options);
+
+    batch_from_indices(
+        batch,
+        schema,
+        unnest_column_idx,
+        &unnested_array,
+        &take_indicies,
+    )
+}
+
+/// Given this `FixedSizeListArray` list_array:
+///   
+/// ```ignore
+/// [1, 2], null, [3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
 ///
 /// ```ignore
-/// 1, null,  3, null, 2
+/// [1, 2, null, null 3, 4, null, null, 5, 6]
 /// ```
 ///
-/// the result indices array is:
+/// So if there are no null values
+/// we can return the values array without any copying.
+///
+/// Otherwise we'll transfrom the values array using the take kernel.
+///
+/// If `UnnestOptions.preserve_nulls` is true the take indicies will look like 
this:
 ///
 /// ```ignore
-/// 0, 1, 2, 2, 2, 3, 4, 4
+/// 0, 1, null, 4, 5, null, 8, 9
+/// ```
+/// Otherwise we drop the nulls and take indicies will look like this:
+///
+/// ```ignore
+/// 0, 1, 4, 5, 8, 9
 /// ```
 ///
-/// where a null value count as one element.
-fn create_take_indices<T>(
-    list_lengths: &PrimitiveArray<T>,
+fn unnest_fixed_list(
+    list_array: &FixedSizeListArray,
+    options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+    let values = list_array.values();
+
+    if list_array.null_count() == 0 {
+        Ok(values.clone())
+    } else {
+        let len_without_nulls =
+            values.len() - list_array.null_count() * list_array.value_length() 
as usize;
+        let null_count = if options.preserve_nulls {
+            list_array.null_count()
+        } else {
+            0
+        };
+        let mut builder =
+            PrimitiveArray::<Int32Type>::builder(len_without_nulls + 
null_count);
+        let mut take_offset = 0;
+        let fixed_value_length = list_array.value_length() as usize;
+        list_array.iter().for_each(|elem| match elem {
+            Some(_) => {
+                for i in 0..fixed_value_length {
+                    //take_offset + i is always positive
+                    let take_index =
+                        <Int32Type as ArrowPrimitiveType>::Native::from_usize(
+                            take_offset + i,
+                        )
+                        .unwrap();
+                    builder.append_value(take_index);

Review Comment:
   This might be easier to understand
   ```suggestion
                       let take_index = take_offset + i;
                       builder.append_value(take_index as i32);
   ```



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -1111,7 +1092,6 @@ async fn unnest_column_nulls() -> Result<()> {
         "+------+----+",
         "| 1    | A  |",
         "| 2    | A  |",
-        "|      | B  |", // this row should not be here

Review Comment:
   🎉 



##########
datafusion/core/src/physical_plan/unnest.rs:
##########
@@ -250,126 +249,266 @@ fn build_batch(
     batch: &RecordBatch,
     schema: &SchemaRef,
     column: &Column,
+    options: &UnnestOptions,
 ) -> Result<RecordBatch> {
     let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
     match list_array.data_type() {
         DataType::List(_) => {
             let list_array = 
list_array.as_any().downcast_ref::<ListArray>().unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i32, Int32Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::LargeList(_) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<LargeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i64, Int64Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::FixedSizeList(_, _) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<FixedSizeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, list_array)
+            build_batch_fixedsize_list(batch, schema, column.index(), 
list_array, options)
         }
         _ => Err(DataFusionError::Execution(format!(
             "Invalid unnest column {column}"
         ))),
     }
 }
 
-fn unnest_batch<T>(
+fn build_batch_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = 
T>>(
     batch: &RecordBatch,
     schema: &SchemaRef,
-    column: &Column,
-    list_array: &T,
-) -> Result<RecordBatch>
-where
-    T: ArrayAccessor<Item = ArrayRef>,
-{
-    // Create an array with the unnested values of the list array, given the 
list
-    // array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 2, 3, 4, null, 5, 6
-    //
-    let unnested_array = unnest_array(list_array)?;
-
-    // Create an array with the lengths of each list value in the nested array.
-    // Given the nested array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 3, null, 2
-    //
-    // Depending on the list type the result may be Int32Array or Int64Array.
-    let list_lengths = kernels::length::length(list_array)?;
-
-    // Create the indices for the take kernel and then use those indices to 
create
-    // the unnested record batch.
-    match list_lengths.data_type() {
-        DataType::Int32 => {
-            let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        DataType::Int64 => {
-            let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        dt => Err(DataFusionError::Execution(format!(
-            "Unnest: unsupported indices type {dt}"
-        ))),
-    }
+    unnest_column_idx: usize,
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<RecordBatch> {
+    let unnested_array = unnest_generic_list::<T, P>(list_array, options)?;
+
+    let take_indicies =
+        create_take_indicies_generic::<T, P>(list_array, unnested_array.len(), 
options);
+
+    batch_from_indices(
+        batch,
+        schema,
+        unnest_column_idx,
+        &unnested_array,
+        &take_indicies,
+    )
 }
 
-/// Create the indices for the take kernel given an array of list values 
lengths.
+/// Given this `GenericList` list_array:
+///   
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
 ///
-/// The indices are used to duplicate column elements so that all columns have 
as
-/// many rows as the unnested array.
+/// ```ignore
+/// [1, 2, 3, 4, 5, 6]
+/// ```
+///
+/// So if there are no null values or `UnnestOptions.preserve_nulls` is false
+/// we can return the values array without any copying.
 ///
-/// Given the nested array:
+/// Otherwise we'll transfrom the values array using the take kernel and the 
following take indicies:
 ///
 /// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// 0, null, 1, 2, 3, null, 4, 5
 /// ```
 ///
-/// the `list_lengths` array contains the length of each list value:
+fn unnest_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = T>>(
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+    let values = list_array.values();
+    if list_array.null_count() == 0 || !options.preserve_nulls {
+        Ok(values.clone())

Review Comment:
   ❤️ 



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -1111,7 +1092,6 @@ async fn unnest_column_nulls() -> Result<()> {
         "+------+----+",
         "| 1    | A  |",
         "| 2    | A  |",
-        "|      | B  |", // this row should not be here

Review Comment:
   🎉 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to