This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2113e2b Sort binary (#569)
2113e2b is described below
commit 2113e2b1e3f708ae28a1ce65ef292c0b3ef7a10d
Author: Ruihang Xia <[email protected]>
AuthorDate: Tue Jul 27 06:26:12 2021 +0800
Sort binary (#569)
* impl sort fixed binary array
Signed-off-by: Ruihang Xia <[email protected]>
* remove array builder util, add test cases
Signed-off-by: Ruihang Xia <[email protected]>
* impl sort for generic binary array
Signed-off-by: Ruihang Xia <[email protected]>
* tidy
Signed-off-by: Ruihang Xia <[email protected]>
* run clippy
Signed-off-by: Ruihang Xia <[email protected]>
* test: fixed binary with different prefix
Signed-off-by: Ruihang Xia <[email protected]>
* rebase master
Signed-off-by: Ruihang Xia <[email protected]>
---
arrow/src/array/cast.rs | 10 ++
arrow/src/array/mod.rs | 6 +-
arrow/src/compute/kernels/sort.rs | 317 ++++++++++++++++++++++++++++++++++++++
arrow/src/compute/kernels/take.rs | 74 +++++++++
4 files changed, 404 insertions(+), 3 deletions(-)
diff --git a/arrow/src/array/cast.rs b/arrow/src/array/cast.rs
index 0477f28..dfc1560 100644
--- a/arrow/src/array/cast.rs
+++ b/arrow/src/array/cast.rs
@@ -59,6 +59,16 @@ pub fn as_large_list_array(arr: &ArrayRef) ->
&LargeListArray {
as_generic_list_array::<i64>(arr)
}
+#[doc = "Force downcast ArrayRef to GenericBinaryArray"]
+#[inline]
+pub fn as_generic_binary_array<S: BinaryOffsetSizeTrait>(
+ arr: &ArrayRef,
+) -> &GenericBinaryArray<S> {
+ arr.as_any()
+ .downcast_ref::<GenericBinaryArray<S>>()
+ .expect("Unable to downcast to binary array")
+}
+
macro_rules! array_downcast_fn {
($name: ident, $arrty: ty, $arrty_str:expr) => {
#[doc = "Force downcast ArrayRef to "]
diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs
index 6a0b94a..69b65f4 100644
--- a/arrow/src/array/mod.rs
+++ b/arrow/src/array/mod.rs
@@ -273,9 +273,9 @@ pub use self::ord::{build_compare, DynComparator};
// --------------------- Array downcast helper functions ---------------------
pub use self::cast::{
- as_boolean_array, as_dictionary_array, as_generic_list_array,
as_large_list_array,
- as_largestring_array, as_list_array, as_null_array, as_primitive_array,
- as_string_array, as_struct_array,
+ as_boolean_array, as_dictionary_array, as_generic_binary_array,
+ as_generic_list_array, as_large_list_array, as_largestring_array,
as_list_array,
+ as_null_array, as_primitive_array, as_string_array, as_struct_array,
};
// ------------------------------ C Data Interface ---------------------------
diff --git a/arrow/src/compute/kernels/sort.rs
b/arrow/src/compute/kernels/sort.rs
index 008661e..95afc14 100644
--- a/arrow/src/compute/kernels/sort.rs
+++ b/arrow/src/compute/kernels/sort.rs
@@ -383,6 +383,10 @@ pub fn sort_to_indices(
}
}
}
+ DataType::Binary | DataType::FixedSizeBinary(_) => {
+ sort_binary::<i32>(values, v, n, &options, limit)
+ }
+ DataType::LargeBinary => sort_binary::<i64>(values, v, n, &options,
limit),
t => {
return Err(ArrowError::ComputeError(format!(
"Sort not supported for data type {:?}",
@@ -764,6 +768,67 @@ where
}
}
+fn sort_binary<S>(
+ values: &ArrayRef,
+ value_indices: Vec<u32>,
+ mut null_indices: Vec<u32>,
+ options: &SortOptions,
+ limit: Option<usize>,
+) -> UInt32Array
+where
+ S: BinaryOffsetSizeTrait,
+{
+ let mut valids: Vec<(u32, &[u8])> = values
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .map_or_else(
+ || {
+ let values = as_generic_binary_array::<S>(values);
+ value_indices
+ .iter()
+ .copied()
+ .map(|index| (index, values.value(index as usize)))
+ .collect()
+ },
+ |values| {
+ value_indices
+ .iter()
+ .copied()
+ .map(|index| (index, values.value(index as usize)))
+ .collect()
+ },
+ );
+
+ let mut len = values.len();
+ let descending = options.descending;
+ let nulls_len = null_indices.len();
+
+ if let Some(limit) = limit {
+ len = limit.min(len);
+ }
+ if !descending {
+ sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
+ a.1.cmp(b.1)
+ });
+ } else {
+ sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
+ a.1.cmp(b.1).reverse()
+ });
+ null_indices.reverse();
+ }
+
+ let mut valid_indices: Vec<u32> = valids.iter().map(|tuple|
tuple.0).collect();
+ if options.nulls_first {
+ null_indices.append(&mut valid_indices);
+ null_indices.truncate(len);
+ UInt32Array::from(null_indices)
+ } else {
+ valid_indices.append(&mut null_indices);
+ valid_indices.truncate(len);
+ UInt32Array::from(valid_indices)
+ }
+}
+
/// Compare two `Array`s based on the ordering defined in
[ord](crate::array::ord).
fn cmp_array(a: &dyn Array, b: &dyn Array) -> Ordering {
let cmp_op = build_compare(a, b).unwrap();
@@ -1183,6 +1248,60 @@ mod tests {
}
}
+ fn test_sort_binary_arrays(
+ data: Vec<Option<Vec<u8>>>,
+ options: Option<SortOptions>,
+ limit: Option<usize>,
+ expected_data: Vec<Option<Vec<u8>>>,
+ fixed_length: Option<i32>,
+ ) {
+ // Fixed size binary array
+ if fixed_length.is_some() {
+ let input = Arc::new(
+
FixedSizeBinaryArray::try_from_sparse_iter(data.iter().cloned()).unwrap(),
+ );
+ let sorted = match limit {
+ Some(_) => sort_limit(&(input as ArrayRef), options,
limit).unwrap(),
+ None => sort(&(input as ArrayRef), options).unwrap(),
+ };
+ let expected = Arc::new(
+
FixedSizeBinaryArray::try_from_sparse_iter(expected_data.iter().cloned())
+ .unwrap(),
+ ) as ArrayRef;
+
+ assert_eq!(&sorted, &expected);
+ }
+
+ // Generic size binary array
+ fn make_generic_binary_array<S: BinaryOffsetSizeTrait>(
+ data: &[Option<Vec<u8>>],
+ ) -> Arc<GenericBinaryArray<S>> {
+ Arc::new(GenericBinaryArray::<S>::from_opt_vec(
+ data.iter()
+ .map(|binary| binary.as_ref().map(Vec::as_slice))
+ .collect(),
+ ))
+ }
+
+ // BinaryArray
+ let input = make_generic_binary_array::<i32>(&data);
+ let sorted = match limit {
+ Some(_) => sort_limit(&(input as ArrayRef), options,
limit).unwrap(),
+ None => sort(&(input as ArrayRef), options).unwrap(),
+ };
+ let expected = make_generic_binary_array::<i32>(&expected_data) as
ArrayRef;
+ assert_eq!(&sorted, &expected);
+
+ // LargeBinaryArray
+ let input = make_generic_binary_array::<i64>(&data);
+ let sorted = match limit {
+ Some(_) => sort_limit(&(input as ArrayRef), options,
limit).unwrap(),
+ None => sort(&(input as ArrayRef), options).unwrap(),
+ };
+ let expected = make_generic_binary_array::<i64>(&expected_data) as
ArrayRef;
+ assert_eq!(&sorted, &expected);
+ }
+
#[test]
fn test_sort_to_indices_primitives() {
test_sort_to_indices_primitive_arrays::<Int8Type>(
@@ -2383,6 +2502,204 @@ mod tests {
}
#[test]
+ fn test_sort_binary() {
+ test_sort_binary_arrays(
+ vec![
+ Some(vec![0, 0, 0]),
+ Some(vec![0, 0, 5]),
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 7]),
+ Some(vec![0, 0, 1]),
+ ],
+ Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ }),
+ None,
+ vec![
+ Some(vec![0, 0, 0]),
+ Some(vec![0, 0, 1]),
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 5]),
+ Some(vec![0, 0, 7]),
+ ],
+ Some(3),
+ );
+
+ // with nulls
+ test_sort_binary_arrays(
+ vec![
+ Some(vec![0, 0, 0]),
+ None,
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 7]),
+ Some(vec![0, 0, 1]),
+ None,
+ ],
+ Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ }),
+ None,
+ vec![
+ Some(vec![0, 0, 0]),
+ Some(vec![0, 0, 1]),
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 7]),
+ None,
+ None,
+ ],
+ Some(3),
+ );
+
+ test_sort_binary_arrays(
+ vec![
+ Some(vec![3, 5, 7]),
+ None,
+ Some(vec![1, 7, 1]),
+ Some(vec![2, 7, 3]),
+ None,
+ Some(vec![1, 4, 3]),
+ ],
+ Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ }),
+ None,
+ vec![
+ Some(vec![1, 4, 3]),
+ Some(vec![1, 7, 1]),
+ Some(vec![2, 7, 3]),
+ Some(vec![3, 5, 7]),
+ None,
+ None,
+ ],
+ Some(3),
+ );
+
+ // descending
+ test_sort_binary_arrays(
+ vec![
+ Some(vec![0, 0, 0]),
+ None,
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 7]),
+ Some(vec![0, 0, 1]),
+ None,
+ ],
+ Some(SortOptions {
+ descending: true,
+ nulls_first: false,
+ }),
+ None,
+ vec![
+ Some(vec![0, 0, 7]),
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 1]),
+ Some(vec![0, 0, 0]),
+ None,
+ None,
+ ],
+ Some(3),
+ );
+
+ // nulls first
+ test_sort_binary_arrays(
+ vec![
+ Some(vec![0, 0, 0]),
+ None,
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 7]),
+ Some(vec![0, 0, 1]),
+ None,
+ ],
+ Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ None,
+ vec![
+ None,
+ None,
+ Some(vec![0, 0, 0]),
+ Some(vec![0, 0, 1]),
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 7]),
+ ],
+ Some(3),
+ );
+
+ // limit
+ test_sort_binary_arrays(
+ vec![
+ Some(vec![0, 0, 0]),
+ None,
+ Some(vec![0, 0, 3]),
+ Some(vec![0, 0, 7]),
+ Some(vec![0, 0, 1]),
+ None,
+ ],
+ Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ Some(4),
+ vec![None, None, Some(vec![0, 0, 0]), Some(vec![0, 0, 1])],
+ Some(3),
+ );
+
+ // var length
+ test_sort_binary_arrays(
+ vec![
+ Some(b"Hello".to_vec()),
+ None,
+ Some(b"from".to_vec()),
+ Some(b"Apache".to_vec()),
+ Some(b"Arrow-rs".to_vec()),
+ None,
+ ],
+ Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ }),
+ None,
+ vec![
+ Some(b"Apache".to_vec()),
+ Some(b"Arrow-rs".to_vec()),
+ Some(b"Hello".to_vec()),
+ Some(b"from".to_vec()),
+ None,
+ None,
+ ],
+ None,
+ );
+
+ // limit
+ test_sort_binary_arrays(
+ vec![
+ Some(b"Hello".to_vec()),
+ None,
+ Some(b"from".to_vec()),
+ Some(b"Apache".to_vec()),
+ Some(b"Arrow-rs".to_vec()),
+ None,
+ ],
+ Some(SortOptions {
+ descending: false,
+ nulls_first: true,
+ }),
+ Some(4),
+ vec![
+ None,
+ None,
+ Some(b"Apache".to_vec()),
+ Some(b"Arrow-rs".to_vec()),
+ ],
+ None,
+ );
+ }
+
+ #[test]
fn test_lex_sort_single_column() {
let input = vec![SortColumn {
values: Arc::new(PrimitiveArray::<Int64Type>::from(vec![
diff --git a/arrow/src/compute/kernels/take.rs
b/arrow/src/compute/kernels/take.rs
index f04208a..75c8f76 100644
--- a/arrow/src/compute/kernels/take.rs
+++ b/arrow/src/compute/kernels/take.rs
@@ -259,6 +259,27 @@ where
DataType::UInt64 => downcast_dict_take!(UInt64Type, values,
indices),
t => unimplemented!("Take not supported for dictionary key type
{:?}", t),
},
+ DataType::Binary => {
+ let values = values
+ .as_any()
+ .downcast_ref::<GenericBinaryArray<i32>>()
+ .unwrap();
+ Ok(Arc::new(take_binary(values, indices)?))
+ }
+ DataType::LargeBinary => {
+ let values = values
+ .as_any()
+ .downcast_ref::<GenericBinaryArray<i64>>()
+ .unwrap();
+ Ok(Arc::new(take_binary(values, indices)?))
+ }
+ DataType::FixedSizeBinary(_) => {
+ let values = values
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .unwrap();
+ Ok(Arc::new(take_fixed_size_binary(values, indices)?))
+ }
t => unimplemented!("Take not supported for data type {:?}", t),
}
}
@@ -760,6 +781,59 @@ where
Ok(FixedSizeListArray::from(list_data))
}
+fn take_binary<IndexType, OffsetType>(
+ values: &GenericBinaryArray<OffsetType>,
+ indices: &PrimitiveArray<IndexType>,
+) -> Result<GenericBinaryArray<OffsetType>>
+where
+ OffsetType: BinaryOffsetSizeTrait,
+ IndexType: ArrowNumericType,
+ IndexType::Native: ToPrimitive,
+{
+ let data_ref = values.data_ref();
+ let array_iter = indices
+ .values()
+ .iter()
+ .map(|idx| {
+ let idx = maybe_usize::<IndexType>(*idx)?;
+ if data_ref.is_valid(idx) {
+ Ok(Some(values.value(idx)))
+ } else {
+ Ok(None)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter();
+
+ Ok(array_iter.collect::<GenericBinaryArray<OffsetType>>())
+}
+
+fn take_fixed_size_binary<IndexType>(
+ values: &FixedSizeBinaryArray,
+ indices: &PrimitiveArray<IndexType>,
+) -> Result<FixedSizeBinaryArray>
+where
+ IndexType: ArrowNumericType,
+ IndexType::Native: ToPrimitive,
+{
+ let data_ref = values.data_ref();
+ let array_iter = indices
+ .values()
+ .iter()
+ .map(|idx| {
+ let idx = maybe_usize::<IndexType>(*idx)?;
+ if data_ref.is_valid(idx) {
+ Ok(Some(values.value(idx)))
+ } else {
+ Ok(None)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter();
+
+ FixedSizeBinaryArray::try_from_sparse_iter(array_iter)
+}
+
/// `take` implementation for dictionary arrays
///
/// applies `take` to the keys of the dictionary array and returns a new
dictionary array