alamb commented on code in PR #6903:
URL: https://github.com/apache/arrow-datafusion/pull/6903#discussion_r1262637708
##########
datafusion/core/src/physical_plan/unnest.rs:
##########
@@ -264,41 +267,224 @@ fn unnest_batch<T>(
where
T: ArrayAccessor<Item = ArrayRef>,
{
- let mut batches = Vec::new();
- let mut num_rows = 0;
-
- for row in 0..batch.num_rows() {
- let arrays = batch
- .columns()
- .iter()
- .enumerate()
- .map(|(col_idx, arr)| {
- if col_idx == column.index() {
- // Unnest the value at the given row.
- if list_array.value(row).is_empty() {
- // If nested array is empty add an array with 1 null.
- Ok(new_null_array(list_array.value(row).data_type(),
1))
- } else {
- Ok(list_array.value(row))
- }
+ // Create an array with the unnested values of the list array, given the
list
+ // array:
+ //
+ // [1], null, [2, 3, 4], null, [5, 6]
+ //
+ // the result array is:
+ //
+ // 1, null, 2, 3, 4, null, 5, 6
+ //
+ let unnested_array = unnest_array(list_array)?;
+
+ // Create an array with the lengths of each list value in the nested array.
+ // Given the nested array:
+ //
+ // [1], null, [2, 3, 4], null, [5, 6]
+ //
+ // the result array is:
+ //
+ // 1, null, 3, null, 2
+ //
+ // Depending on the list type the result may be Int32Array or Int64Array.
+ let list_lengths = list_lengths(list_array)?;
+
+ // Create the indices for the take kernel and then use those indices to
create
+ // the unnested record batch.
+ match list_lengths.data_type() {
+ DataType::Int32 => {
+ let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
+ let indices = create_take_indices(list_lengths,
unnested_array.len());
+ batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
+ }
+ DataType::Int64 => {
+ let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
+ let indices = create_take_indices(list_lengths,
unnested_array.len());
+ batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
+ }
+ dt => Err(DataFusionError::Execution(format!(
+ "Unnest: unsupported indices type {dt}"
+ ))),
+ }
+}
+
+/// Create the indices for the take kernel given an array of list values
lengths.
+///
+/// The indices are used to duplicate column elements so that all columns have
as
+/// many rows as the unnested array.
+///
+/// Given the nested array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// the `list_lengths` array contains the length of each list value:
+///
+/// ```ignore
+/// 1, null, 3, null, 2
+/// ```
+///
+/// the result indices array is:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// where a null value count as one element.
+fn create_take_indices<T>(
+ list_lengths: &PrimitiveArray<T>,
+ capacity: usize,
+) -> PrimitiveArray<T>
+where
+ T: ArrowPrimitiveType,
+{
+ let mut builder = PrimitiveArray::<T>::builder(capacity);
+ for row in 0..list_lengths.len() {
+ let repeat = if list_lengths.is_null(row) {
+ T::Native::ONE
+ } else {
+ list_lengths.value(row)
+ };
+
+ // Both `repeat` and `index` are positive intergers.
+ let repeat = repeat.to_usize().unwrap();
+ let index = T::Native::from_usize(row).unwrap();
+ (0..repeat).for_each(|_| builder.append_value(index));
+ }
+
+ builder.finish()
+}
+
+/// Create the final batch given the unnested column array and a `indices`
array
+/// that is used by the take kernel to copy values.
+///
+/// For example if we have the following `RecordBatch`:
+///
+/// ```ignore
+/// c1: [1], null, [2, 3, 4], null, [5, 6]
+/// c2: 'a', 'b', 'c', null, 'd'
+/// ```
+///
+/// then the `unnested_array` contains the unnest column that will replace
`c1` in
+/// the final batch:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// ```
+///
+/// And the `indices` array contains the indices that are used by `take`
kernel to
+/// repeat the values in `c2`:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// so that the final batch will look like:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
+/// ```
+///
+fn batch_from_indices<T>(
+ batch: &RecordBatch,
+ schema: &SchemaRef,
+ unnest_column_idx: usize,
+ unnested_array: &ArrayRef,
+ indices: &PrimitiveArray<T>,
+) -> Result<RecordBatch>
+where
+ T: ArrowPrimitiveType,
+{
+ let arrays = batch
+ .columns()
+ .iter()
+ .enumerate()
+ .map(|(col_idx, arr)| {
+ if col_idx == unnest_column_idx {
+ Ok(unnested_array.clone())
+ } else {
+ Ok(kernels::take::take(&arr, indices, None)?)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
+}
+
+/// Unnest the given list array. Given the array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// returns:
+///
+/// ```ignore
+/// 1, null, 2, 3, 4, null, 5, 6
+/// ```
+fn unnest_array<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+ T: ArrayAccessor<Item = ArrayRef>,
+{
+ let elem_type = match list_array.data_type() {
+ DataType::List(f) | DataType::FixedSizeList(f, _) |
DataType::LargeList(f) => {
+ f.data_type()
+ }
+ dt => {
+ return Err(DataFusionError::Execution(format!(
+ "Cannot unnest array of type {dt}"
+ )))
+ }
+ };
+
+ let null_row = new_null_array(elem_type, 1);
+
+ // Create a vec of ArrayRef from the list elements.
+ let arrays = (0..list_array.len())
+ .map(|row| {
+ if list_array.is_null(row) {
+ null_row.clone()
+ } else {
+ list_array.value(row)
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Create Vec<&dyn Array> from Vec<Arc<dyn Array>> for `concat`. Calling
+ // `as_ref()` in the `map` above causes the borrow checker to complain.
+ let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
+
+ Ok(kernels::concat::concat(&arrays)?)
+}
+
+/// Returns an array with the lengths of each list in `list_array`. Returns
null
+/// for a null value.
+fn list_lengths<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+ T: ArrayAccessor<Item = ArrayRef>,
+{
+ match list_array.data_type() {
+ DataType::List(_) | DataType::LargeList(_) => {
+ Ok(kernels::length::length(list_array)?)
+ }
+ DataType::FixedSizeList(_, size) => {
+ // Handle FixedSizeList as it is not handled by the `length`
kernel.
Review Comment:
Filed https://github.com/apache/arrow-rs/issues/4517 to trach this
```suggestion
// Handle FixedSizeList as it is not handled by the `length`
kernel.
// https://github.com/apache/arrow-rs/issues/4517
```
##########
datafusion/core/src/physical_plan/unnest.rs:
##########
@@ -264,41 +267,224 @@ fn unnest_batch<T>(
where
T: ArrayAccessor<Item = ArrayRef>,
{
- let mut batches = Vec::new();
- let mut num_rows = 0;
-
- for row in 0..batch.num_rows() {
- let arrays = batch
- .columns()
- .iter()
- .enumerate()
- .map(|(col_idx, arr)| {
- if col_idx == column.index() {
- // Unnest the value at the given row.
- if list_array.value(row).is_empty() {
- // If nested array is empty add an array with 1 null.
- Ok(new_null_array(list_array.value(row).data_type(),
1))
- } else {
- Ok(list_array.value(row))
- }
+ // Create an array with the unnested values of the list array, given the
list
+ // array:
+ //
+ // [1], null, [2, 3, 4], null, [5, 6]
+ //
+ // the result array is:
+ //
+ // 1, null, 2, 3, 4, null, 5, 6
+ //
+ let unnested_array = unnest_array(list_array)?;
+
+ // Create an array with the lengths of each list value in the nested array.
+ // Given the nested array:
+ //
+ // [1], null, [2, 3, 4], null, [5, 6]
+ //
+ // the result array is:
+ //
+ // 1, null, 3, null, 2
+ //
+ // Depending on the list type the result may be Int32Array or Int64Array.
+ let list_lengths = list_lengths(list_array)?;
+
+ // Create the indices for the take kernel and then use those indices to
create
+ // the unnested record batch.
+ match list_lengths.data_type() {
+ DataType::Int32 => {
+ let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
+ let indices = create_take_indices(list_lengths,
unnested_array.len());
+ batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
+ }
+ DataType::Int64 => {
+ let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
+ let indices = create_take_indices(list_lengths,
unnested_array.len());
+ batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
+ }
+ dt => Err(DataFusionError::Execution(format!(
+ "Unnest: unsupported indices type {dt}"
+ ))),
+ }
+}
+
+/// Create the indices for the take kernel given an array of list values
lengths.
+///
+/// The indices are used to duplicate column elements so that all columns have
as
+/// many rows as the unnested array.
+///
+/// Given the nested array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// the `list_lengths` array contains the length of each list value:
+///
+/// ```ignore
+/// 1, null, 3, null, 2
+/// ```
+///
+/// the result indices array is:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// where a null value count as one element.
+fn create_take_indices<T>(
+ list_lengths: &PrimitiveArray<T>,
+ capacity: usize,
+) -> PrimitiveArray<T>
+where
+ T: ArrowPrimitiveType,
+{
+ let mut builder = PrimitiveArray::<T>::builder(capacity);
+ for row in 0..list_lengths.len() {
+ let repeat = if list_lengths.is_null(row) {
+ T::Native::ONE
+ } else {
+ list_lengths.value(row)
+ };
+
+ // Both `repeat` and `index` are positive intergers.
+ let repeat = repeat.to_usize().unwrap();
+ let index = T::Native::from_usize(row).unwrap();
+ (0..repeat).for_each(|_| builder.append_value(index));
+ }
+
+ builder.finish()
+}
+
+/// Create the final batch given the unnested column array and a `indices`
array
+/// that is used by the take kernel to copy values.
+///
+/// For example if we have the following `RecordBatch`:
+///
+/// ```ignore
+/// c1: [1], null, [2, 3, 4], null, [5, 6]
+/// c2: 'a', 'b', 'c', null, 'd'
+/// ```
+///
+/// then the `unnested_array` contains the unnest column that will replace
`c1` in
+/// the final batch:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// ```
+///
+/// And the `indices` array contains the indices that are used by `take`
kernel to
+/// repeat the values in `c2`:
+///
+/// ```ignore
+/// 0, 1, 2, 2, 2, 3, 4, 4
+/// ```
+///
+/// so that the final batch will look like:
+///
+/// ```ignore
+/// c1: 1, null, 2, 3, 4, null, 5, 6
+/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
+/// ```
+///
+fn batch_from_indices<T>(
+ batch: &RecordBatch,
+ schema: &SchemaRef,
+ unnest_column_idx: usize,
+ unnested_array: &ArrayRef,
+ indices: &PrimitiveArray<T>,
+) -> Result<RecordBatch>
+where
+ T: ArrowPrimitiveType,
+{
+ let arrays = batch
+ .columns()
+ .iter()
+ .enumerate()
+ .map(|(col_idx, arr)| {
+ if col_idx == unnest_column_idx {
+ Ok(unnested_array.clone())
+ } else {
+ Ok(kernels::take::take(&arr, indices, None)?)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
+}
+
+/// Unnest the given list array. Given the array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+///
+/// returns:
+///
+/// ```ignore
+/// 1, null, 2, 3, 4, null, 5, 6
+/// ```
+fn unnest_array<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+ T: ArrayAccessor<Item = ArrayRef>,
+{
+ let elem_type = match list_array.data_type() {
+ DataType::List(f) | DataType::FixedSizeList(f, _) |
DataType::LargeList(f) => {
+ f.data_type()
+ }
+ dt => {
+ return Err(DataFusionError::Execution(format!(
+ "Cannot unnest array of type {dt}"
+ )))
+ }
+ };
+
+ let null_row = new_null_array(elem_type, 1);
+
+ // Create a vec of ArrayRef from the list elements.
+ let arrays = (0..list_array.len())
+ .map(|row| {
+ if list_array.is_null(row) {
+ null_row.clone()
+ } else {
+ list_array.value(row)
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Create Vec<&dyn Array> from Vec<Arc<dyn Array>> for `concat`. Calling
+ // `as_ref()` in the `map` above causes the borrow checker to complain.
+ let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
+
+ Ok(kernels::concat::concat(&arrays)?)
+}
+
+/// Returns an array with the lengths of each list in `list_array`. Returns
null
+/// for a null value.
+fn list_lengths<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
+where
+ T: ArrayAccessor<Item = ArrayRef>,
+{
+ match list_array.data_type() {
+ DataType::List(_) | DataType::LargeList(_) => {
+ Ok(kernels::length::length(list_array)?)
+ }
+ DataType::FixedSizeList(_, size) => {
+ // Handle FixedSizeList as it is not handled by the `length`
kernel.
Review Comment:
Filed https://github.com/apache/arrow-rs/issues/4517 to track this
```suggestion
// Handle FixedSizeList as it is not handled by the `length`
kernel.
// https://github.com/apache/arrow-rs/issues/4517
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]