This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 27992680d5 feat(arrow-select): `concat` kernel will merge dictionary
values for list of dictionaries (#6893)
27992680d5 is described below
commit 27992680d5677b629d7b4ca11d3f94c2fe905f1a
Author: Raz Luvaton <[email protected]>
AuthorDate: Sat Jan 4 12:24:48 2025 +0200
feat(arrow-select): `concat` kernel will merge dictionary values for list
of dictionaries (#6893)
* feat(arrow-select): make list of dictionary merge dictionary keys
TODO:
- [ ] Add support to nested lists
- [ ] Add more tests
- [ ] Fix failing test
* fix concat lists of dictionaries
* format
* remove unused import
* improve test helper
* feat: add merge offset buffers into one
* format
* add reproduction tst
* recommit
* fix clippy
* fix clippy
* fix clippy
* improve offsets code according to code review
* use concat dictionaries
* add specialize code to concat lists to be able to use the concat
dictionary logic
* remove the use of ArrayData
---
arrow-buffer/src/buffer/offset.rs | 52 +++++++++++
arrow-select/src/concat.rs | 191 +++++++++++++++++++++++++++++++++++---
2 files changed, 232 insertions(+), 11 deletions(-)
diff --git a/arrow-buffer/src/buffer/offset.rs
b/arrow-buffer/src/buffer/offset.rs
index a6be2b67af..164af6f01d 100644
--- a/arrow-buffer/src/buffer/offset.rs
+++ b/arrow-buffer/src/buffer/offset.rs
@@ -133,6 +133,38 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
Self(out.into())
}
+ /// Get an Iterator over the lengths of this [`OffsetBuffer`]
+ ///
+ /// ```
+ /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer};
+ /// let offsets = OffsetBuffer::<_>::new(ScalarBuffer::<i32>::from(vec![0,
1, 4, 9]));
+ /// assert_eq!(offsets.lengths().collect::<Vec<usize>>(), vec![1, 3, 5]);
+ /// ```
+ ///
+ /// Empty [`OffsetBuffer`] will return an empty iterator
+ /// ```
+ /// # use arrow_buffer::OffsetBuffer;
+ /// let offsets = OffsetBuffer::<i32>::new_empty();
+ /// assert_eq!(offsets.lengths().count(), 0);
+ /// ```
+ ///
+ /// This can be used to merge multiple [`OffsetBuffer`]s to one
+ /// ```
+ /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer};
+ ///
+ /// let buffer1 = OffsetBuffer::<i32>::from_lengths([2, 6, 3, 7, 2]);
+ /// let buffer2 = OffsetBuffer::<i32>::from_lengths([1, 3, 5, 7, 9]);
+ ///
+ /// let merged = OffsetBuffer::<i32>::from_lengths(
+ /// vec![buffer1, buffer2].iter().flat_map(|x| x.lengths())
+ /// );
+ ///
+ /// assert_eq!(merged.lengths().collect::<Vec<_>>(), &[2, 6, 3, 7, 2, 1,
3, 5, 7, 9]);
+ /// ```
+ pub fn lengths(&self) -> impl ExactSizeIterator<Item = usize> + '_ {
+ self.0.windows(2).map(|x| x[1].as_usize() - x[0].as_usize())
+ }
+
/// Free up unused memory.
pub fn shrink_to_fit(&mut self) {
self.0.shrink_to_fit();
@@ -244,4 +276,24 @@ mod tests {
fn from_lengths_usize_overflow() {
OffsetBuffer::<i32>::from_lengths([usize::MAX, 1]);
}
+
+ #[test]
+ fn get_lengths() {
+ let offsets =
OffsetBuffer::<i32>::new(ScalarBuffer::<i32>::from(vec![0, 1, 4, 9]));
+ assert_eq!(offsets.lengths().collect::<Vec<usize>>(), vec![1, 3, 5]);
+ }
+
+ #[test]
+ fn get_lengths_should_be_with_fixed_size() {
+ let offsets =
OffsetBuffer::<i32>::new(ScalarBuffer::<i32>::from(vec![0, 1, 4, 9]));
+ let iter = offsets.lengths();
+ assert_eq!(iter.size_hint(), (3, Some(3)));
+ assert_eq!(iter.len(), 3);
+ }
+
+ #[test]
+ fn get_lengths_from_empty_offset_buffer_should_be_empty_iterator() {
+ let offsets = OffsetBuffer::<i32>::new_empty();
+ assert_eq!(offsets.lengths().collect::<Vec<usize>>(), vec![]);
+ }
}
diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs
index 129b90ee04..4855e0087c 100644
--- a/arrow-select/src/concat.rs
+++ b/arrow-select/src/concat.rs
@@ -34,9 +34,9 @@ use crate::dictionary::{merge_dictionary_values,
should_merge_dictionary_values}
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
-use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer};
+use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer,
OffsetBuffer};
use arrow_data::transform::{Capacities, MutableArrayData};
-use arrow_schema::{ArrowError, DataType, SchemaRef};
+use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef};
use std::sync::Arc;
fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
@@ -129,6 +129,54 @@ fn concat_dictionaries<K: ArrowDictionaryKeyType>(
Ok(Arc::new(array))
}
+fn concat_lists<OffsetSize: OffsetSizeTrait>(
+ arrays: &[&dyn Array],
+ field: &FieldRef,
+) -> Result<ArrayRef, ArrowError> {
+ let mut output_len = 0;
+ let mut list_has_nulls = false;
+
+ let lists = arrays
+ .iter()
+ .map(|x| x.as_list::<OffsetSize>())
+ .inspect(|l| {
+ output_len += l.len();
+ list_has_nulls |= l.null_count() != 0;
+ })
+ .collect::<Vec<_>>();
+
+ let lists_nulls = list_has_nulls.then(|| {
+ let mut nulls = BooleanBufferBuilder::new(output_len);
+ for l in &lists {
+ match l.nulls() {
+ Some(n) => nulls.append_buffer(n.inner()),
+ None => nulls.append_n(l.len(), true),
+ }
+ }
+ NullBuffer::new(nulls.finish())
+ });
+
+ let values: Vec<&dyn Array> = lists
+ .iter()
+ .map(|x| x.values().as_ref())
+ .collect::<Vec<_>>();
+
+ let concatenated_values = concat(values.as_slice())?;
+
+ // Merge value offsets from the lists
+ let value_offset_buffer =
+ OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x|
x.offsets().lengths()));
+
+ let array = GenericListArray::<OffsetSize>::try_new(
+ Arc::clone(field),
+ value_offset_buffer,
+ concatenated_values,
+ lists_nulls,
+ )?;
+
+ Ok(Arc::new(array))
+}
+
macro_rules! dict_helper {
($t:ty, $arrays:expr) => {
return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
@@ -163,14 +211,20 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef,
ArrowError> {
"It is not possible to concatenate arrays of different data
types.".to_string(),
));
}
- if let DataType::Dictionary(k, _) = d {
- downcast_integer! {
- k.as_ref() => (dict_helper, arrays),
- _ => unreachable!("illegal dictionary key type {k}")
- };
- } else {
- let capacity = get_capacity(arrays, d);
- concat_fallback(arrays, capacity)
+
+ match d {
+ DataType::Dictionary(k, _) => {
+ downcast_integer! {
+ k.as_ref() => (dict_helper, arrays),
+ _ => unreachable!("illegal dictionary key type {k}")
+ }
+ }
+ DataType::List(field) => concat_lists::<i32>(arrays, field),
+ DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
+ _ => {
+ let capacity = get_capacity(arrays, d);
+ concat_fallback(arrays, capacity)
+ }
}
}
@@ -228,8 +282,9 @@ pub fn concat_batches<'a>(
#[cfg(test)]
mod tests {
use super::*;
- use arrow_array::builder::StringDictionaryBuilder;
+ use arrow_array::builder::{GenericListBuilder, StringDictionaryBuilder};
use arrow_schema::{Field, Schema};
+ use std::fmt::Debug;
#[test]
fn test_concat_empty_vec() {
@@ -851,4 +906,118 @@ mod tests {
assert_eq!(array.null_count(), 10);
assert_eq!(array.logical_null_count(), 10);
}
+
+ #[test]
+ fn concat_dictionary_list_array_simple() {
+ let scalars = vec![
+ create_single_row_list_of_dict(vec![Some("a")]),
+ create_single_row_list_of_dict(vec![Some("a")]),
+ create_single_row_list_of_dict(vec![Some("b")]),
+ ];
+
+ let arrays = scalars
+ .iter()
+ .map(|a| a as &(dyn Array))
+ .collect::<Vec<_>>();
+ let concat_res = concat(arrays.as_slice()).unwrap();
+
+ let expected_list = create_list_of_dict(vec![
+ // Row 1
+ Some(vec![Some("a")]),
+ Some(vec![Some("a")]),
+ Some(vec![Some("b")]),
+ ]);
+
+ let list = concat_res.as_list::<i32>();
+
+ // Assert that the list is equal to the expected list
+ list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
+ assert_eq!(a, b);
+ });
+
+ assert_dictionary_has_unique_values::<_, StringArray>(
+ list.values().as_dictionary::<Int32Type>(),
+ );
+ }
+
+ #[test]
+ fn concat_many_dictionary_list_arrays() {
+ let number_of_unique_values = 8;
+ let scalars = (0..80000)
+ .map(|i| {
+ create_single_row_list_of_dict(vec![Some(
+ (i % number_of_unique_values).to_string(),
+ )])
+ })
+ .collect::<Vec<_>>();
+
+ let arrays = scalars
+ .iter()
+ .map(|a| a as &(dyn Array))
+ .collect::<Vec<_>>();
+ let concat_res = concat(arrays.as_slice()).unwrap();
+
+ let expected_list = create_list_of_dict(
+ (0..80000)
+ .map(|i| Some(vec![Some((i %
number_of_unique_values).to_string())]))
+ .collect::<Vec<_>>(),
+ );
+
+ let list = concat_res.as_list::<i32>();
+
+ // Assert that the list is equal to the expected list
+ list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
+ assert_eq!(a, b);
+ });
+
+ assert_dictionary_has_unique_values::<_, StringArray>(
+ list.values().as_dictionary::<Int32Type>(),
+ );
+ }
+
+ fn create_single_row_list_of_dict(
+ list_items: Vec<Option<impl AsRef<str>>>,
+ ) -> GenericListArray<i32> {
+ let rows = list_items.into_iter().map(Some).collect();
+
+ create_list_of_dict(vec![rows])
+ }
+
+ fn create_list_of_dict(
+ rows: Vec<Option<Vec<Option<impl AsRef<str>>>>>,
+ ) -> GenericListArray<i32> {
+ let mut builder =
+ GenericListBuilder::<i32,
_>::new(StringDictionaryBuilder::<Int32Type>::new());
+
+ for row in rows {
+ builder.append_option(row);
+ }
+
+ builder.finish()
+ }
+
+ fn assert_dictionary_has_unique_values<'a, K, V>(array: &'a
DictionaryArray<K>)
+ where
+ K: ArrowDictionaryKeyType,
+ V: Sync + Send + 'static,
+ &'a V: ArrayAccessor + IntoIterator,
+
+ <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug +
Ord,
+ <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord,
+ {
+ let dict = array.downcast_dict::<V>().unwrap();
+ let mut values = dict.values().into_iter().collect::<Vec<_>>();
+
+ // remove duplicates must be sorted first so we can compare
+ values.sort();
+
+ let mut unique_values = values.clone();
+
+ unique_values.dedup();
+
+ assert_eq!(
+ values, unique_values,
+ "There are duplicates in the value list (the value list here is
sorted which is only for the assertion)"
+ );
+ }
}