This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2113e2b  Sort binary (#569)
2113e2b is described below

commit 2113e2b1e3f708ae28a1ce65ef292c0b3ef7a10d
Author: Ruihang Xia <[email protected]>
AuthorDate: Tue Jul 27 06:26:12 2021 +0800

    Sort binary (#569)
    
    * impl sort fixed binary array
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * remove array builder util, add test cases
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * impl sort for generic binary array
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * tidy
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * run clippy
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * test: fixed binary with different prefix
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * rebase master
    
    Signed-off-by: Ruihang Xia <[email protected]>
---
 arrow/src/array/cast.rs           |  10 ++
 arrow/src/array/mod.rs            |   6 +-
 arrow/src/compute/kernels/sort.rs | 317 ++++++++++++++++++++++++++++++++++++++
 arrow/src/compute/kernels/take.rs |  74 +++++++++
 4 files changed, 404 insertions(+), 3 deletions(-)

diff --git a/arrow/src/array/cast.rs b/arrow/src/array/cast.rs
index 0477f28..dfc1560 100644
--- a/arrow/src/array/cast.rs
+++ b/arrow/src/array/cast.rs
@@ -59,6 +59,16 @@ pub fn as_large_list_array(arr: &ArrayRef) -> 
&LargeListArray {
     as_generic_list_array::<i64>(arr)
 }
 
+#[doc = "Force downcast ArrayRef to GenericBinaryArray"]
+#[inline]
+pub fn as_generic_binary_array<S: BinaryOffsetSizeTrait>(
+    arr: &ArrayRef,
+) -> &GenericBinaryArray<S> {
+    arr.as_any()
+        .downcast_ref::<GenericBinaryArray<S>>()
+        .expect("Unable to downcast to binary array")
+}
+
 macro_rules! array_downcast_fn {
     ($name: ident, $arrty: ty, $arrty_str:expr) => {
         #[doc = "Force downcast ArrayRef to "]
diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs
index 6a0b94a..69b65f4 100644
--- a/arrow/src/array/mod.rs
+++ b/arrow/src/array/mod.rs
@@ -273,9 +273,9 @@ pub use self::ord::{build_compare, DynComparator};
 // --------------------- Array downcast helper functions ---------------------
 
 pub use self::cast::{
-    as_boolean_array, as_dictionary_array, as_generic_list_array, 
as_large_list_array,
-    as_largestring_array, as_list_array, as_null_array, as_primitive_array,
-    as_string_array, as_struct_array,
+    as_boolean_array, as_dictionary_array, as_generic_binary_array,
+    as_generic_list_array, as_large_list_array, as_largestring_array, 
as_list_array,
+    as_null_array, as_primitive_array, as_string_array, as_struct_array,
 };
 
 // ------------------------------ C Data Interface ---------------------------
diff --git a/arrow/src/compute/kernels/sort.rs 
b/arrow/src/compute/kernels/sort.rs
index 008661e..95afc14 100644
--- a/arrow/src/compute/kernels/sort.rs
+++ b/arrow/src/compute/kernels/sort.rs
@@ -383,6 +383,10 @@ pub fn sort_to_indices(
                 }
             }
         }
+        DataType::Binary | DataType::FixedSizeBinary(_) => {
+            sort_binary::<i32>(values, v, n, &options, limit)
+        }
+        DataType::LargeBinary => sort_binary::<i64>(values, v, n, &options, 
limit),
         t => {
             return Err(ArrowError::ComputeError(format!(
                 "Sort not supported for data type {:?}",
@@ -764,6 +768,67 @@ where
     }
 }
 
+fn sort_binary<S>(
+    values: &ArrayRef,
+    value_indices: Vec<u32>,
+    mut null_indices: Vec<u32>,
+    options: &SortOptions,
+    limit: Option<usize>,
+) -> UInt32Array
+where
+    S: BinaryOffsetSizeTrait,
+{
+    let mut valids: Vec<(u32, &[u8])> = values
+        .as_any()
+        .downcast_ref::<FixedSizeBinaryArray>()
+        .map_or_else(
+            || {
+                let values = as_generic_binary_array::<S>(values);
+                value_indices
+                    .iter()
+                    .copied()
+                    .map(|index| (index, values.value(index as usize)))
+                    .collect()
+            },
+            |values| {
+                value_indices
+                    .iter()
+                    .copied()
+                    .map(|index| (index, values.value(index as usize)))
+                    .collect()
+            },
+        );
+
+    let mut len = values.len();
+    let descending = options.descending;
+    let nulls_len = null_indices.len();
+
+    if let Some(limit) = limit {
+        len = limit.min(len);
+    }
+    if !descending {
+        sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
+            a.1.cmp(b.1)
+        });
+    } else {
+        sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
+            a.1.cmp(b.1).reverse()
+        });
+        null_indices.reverse();
+    }
+
+    let mut valid_indices: Vec<u32> = valids.iter().map(|tuple| 
tuple.0).collect();
+    if options.nulls_first {
+        null_indices.append(&mut valid_indices);
+        null_indices.truncate(len);
+        UInt32Array::from(null_indices)
+    } else {
+        valid_indices.append(&mut null_indices);
+        valid_indices.truncate(len);
+        UInt32Array::from(valid_indices)
+    }
+}
+
 /// Compare two `Array`s based on the ordering defined in 
[ord](crate::array::ord).
 fn cmp_array(a: &dyn Array, b: &dyn Array) -> Ordering {
     let cmp_op = build_compare(a, b).unwrap();
@@ -1183,6 +1248,60 @@ mod tests {
         }
     }
 
+    fn test_sort_binary_arrays(
+        data: Vec<Option<Vec<u8>>>,
+        options: Option<SortOptions>,
+        limit: Option<usize>,
+        expected_data: Vec<Option<Vec<u8>>>,
+        fixed_length: Option<i32>,
+    ) {
+        // Fixed size binary array
+        if fixed_length.is_some() {
+            let input = Arc::new(
+                
FixedSizeBinaryArray::try_from_sparse_iter(data.iter().cloned()).unwrap(),
+            );
+            let sorted = match limit {
+                Some(_) => sort_limit(&(input as ArrayRef), options, 
limit).unwrap(),
+                None => sort(&(input as ArrayRef), options).unwrap(),
+            };
+            let expected = Arc::new(
+                
FixedSizeBinaryArray::try_from_sparse_iter(expected_data.iter().cloned())
+                    .unwrap(),
+            ) as ArrayRef;
+
+            assert_eq!(&sorted, &expected);
+        }
+
+        // Generic size binary array
+        fn make_generic_binary_array<S: BinaryOffsetSizeTrait>(
+            data: &[Option<Vec<u8>>],
+        ) -> Arc<GenericBinaryArray<S>> {
+            Arc::new(GenericBinaryArray::<S>::from_opt_vec(
+                data.iter()
+                    .map(|binary| binary.as_ref().map(Vec::as_slice))
+                    .collect(),
+            ))
+        }
+
+        // BinaryArray
+        let input = make_generic_binary_array::<i32>(&data);
+        let sorted = match limit {
+            Some(_) => sort_limit(&(input as ArrayRef), options, 
limit).unwrap(),
+            None => sort(&(input as ArrayRef), options).unwrap(),
+        };
+        let expected = make_generic_binary_array::<i32>(&expected_data) as 
ArrayRef;
+        assert_eq!(&sorted, &expected);
+
+        // LargeBinaryArray
+        let input = make_generic_binary_array::<i64>(&data);
+        let sorted = match limit {
+            Some(_) => sort_limit(&(input as ArrayRef), options, 
limit).unwrap(),
+            None => sort(&(input as ArrayRef), options).unwrap(),
+        };
+        let expected = make_generic_binary_array::<i64>(&expected_data) as 
ArrayRef;
+        assert_eq!(&sorted, &expected);
+    }
+
     #[test]
     fn test_sort_to_indices_primitives() {
         test_sort_to_indices_primitive_arrays::<Int8Type>(
@@ -2383,6 +2502,204 @@ mod tests {
     }
 
     #[test]
+    fn test_sort_binary() {
+        test_sort_binary_arrays(
+            vec![
+                Some(vec![0, 0, 0]),
+                Some(vec![0, 0, 5]),
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 7]),
+                Some(vec![0, 0, 1]),
+            ],
+            Some(SortOptions {
+                descending: false,
+                nulls_first: false,
+            }),
+            None,
+            vec![
+                Some(vec![0, 0, 0]),
+                Some(vec![0, 0, 1]),
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 5]),
+                Some(vec![0, 0, 7]),
+            ],
+            Some(3),
+        );
+
+        // with nulls
+        test_sort_binary_arrays(
+            vec![
+                Some(vec![0, 0, 0]),
+                None,
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 7]),
+                Some(vec![0, 0, 1]),
+                None,
+            ],
+            Some(SortOptions {
+                descending: false,
+                nulls_first: false,
+            }),
+            None,
+            vec![
+                Some(vec![0, 0, 0]),
+                Some(vec![0, 0, 1]),
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 7]),
+                None,
+                None,
+            ],
+            Some(3),
+        );
+
+        test_sort_binary_arrays(
+            vec![
+                Some(vec![3, 5, 7]),
+                None,
+                Some(vec![1, 7, 1]),
+                Some(vec![2, 7, 3]),
+                None,
+                Some(vec![1, 4, 3]),
+            ],
+            Some(SortOptions {
+                descending: false,
+                nulls_first: false,
+            }),
+            None,
+            vec![
+                Some(vec![1, 4, 3]),
+                Some(vec![1, 7, 1]),
+                Some(vec![2, 7, 3]),
+                Some(vec![3, 5, 7]),
+                None,
+                None,
+            ],
+            Some(3),
+        );
+
+        // descending
+        test_sort_binary_arrays(
+            vec![
+                Some(vec![0, 0, 0]),
+                None,
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 7]),
+                Some(vec![0, 0, 1]),
+                None,
+            ],
+            Some(SortOptions {
+                descending: true,
+                nulls_first: false,
+            }),
+            None,
+            vec![
+                Some(vec![0, 0, 7]),
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 1]),
+                Some(vec![0, 0, 0]),
+                None,
+                None,
+            ],
+            Some(3),
+        );
+
+        // nulls first
+        test_sort_binary_arrays(
+            vec![
+                Some(vec![0, 0, 0]),
+                None,
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 7]),
+                Some(vec![0, 0, 1]),
+                None,
+            ],
+            Some(SortOptions {
+                descending: false,
+                nulls_first: true,
+            }),
+            None,
+            vec![
+                None,
+                None,
+                Some(vec![0, 0, 0]),
+                Some(vec![0, 0, 1]),
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 7]),
+            ],
+            Some(3),
+        );
+
+        // limit
+        test_sort_binary_arrays(
+            vec![
+                Some(vec![0, 0, 0]),
+                None,
+                Some(vec![0, 0, 3]),
+                Some(vec![0, 0, 7]),
+                Some(vec![0, 0, 1]),
+                None,
+            ],
+            Some(SortOptions {
+                descending: false,
+                nulls_first: true,
+            }),
+            Some(4),
+            vec![None, None, Some(vec![0, 0, 0]), Some(vec![0, 0, 1])],
+            Some(3),
+        );
+
+        // var length
+        test_sort_binary_arrays(
+            vec![
+                Some(b"Hello".to_vec()),
+                None,
+                Some(b"from".to_vec()),
+                Some(b"Apache".to_vec()),
+                Some(b"Arrow-rs".to_vec()),
+                None,
+            ],
+            Some(SortOptions {
+                descending: false,
+                nulls_first: false,
+            }),
+            None,
+            vec![
+                Some(b"Apache".to_vec()),
+                Some(b"Arrow-rs".to_vec()),
+                Some(b"Hello".to_vec()),
+                Some(b"from".to_vec()),
+                None,
+                None,
+            ],
+            None,
+        );
+
+        // limit
+        test_sort_binary_arrays(
+            vec![
+                Some(b"Hello".to_vec()),
+                None,
+                Some(b"from".to_vec()),
+                Some(b"Apache".to_vec()),
+                Some(b"Arrow-rs".to_vec()),
+                None,
+            ],
+            Some(SortOptions {
+                descending: false,
+                nulls_first: true,
+            }),
+            Some(4),
+            vec![
+                None,
+                None,
+                Some(b"Apache".to_vec()),
+                Some(b"Arrow-rs".to_vec()),
+            ],
+            None,
+        );
+    }
+
+    #[test]
     fn test_lex_sort_single_column() {
         let input = vec![SortColumn {
             values: Arc::new(PrimitiveArray::<Int64Type>::from(vec![
diff --git a/arrow/src/compute/kernels/take.rs 
b/arrow/src/compute/kernels/take.rs
index f04208a..75c8f76 100644
--- a/arrow/src/compute/kernels/take.rs
+++ b/arrow/src/compute/kernels/take.rs
@@ -259,6 +259,27 @@ where
             DataType::UInt64 => downcast_dict_take!(UInt64Type, values, 
indices),
             t => unimplemented!("Take not supported for dictionary key type 
{:?}", t),
         },
+        DataType::Binary => {
+            let values = values
+                .as_any()
+                .downcast_ref::<GenericBinaryArray<i32>>()
+                .unwrap();
+            Ok(Arc::new(take_binary(values, indices)?))
+        }
+        DataType::LargeBinary => {
+            let values = values
+                .as_any()
+                .downcast_ref::<GenericBinaryArray<i64>>()
+                .unwrap();
+            Ok(Arc::new(take_binary(values, indices)?))
+        }
+        DataType::FixedSizeBinary(_) => {
+            let values = values
+                .as_any()
+                .downcast_ref::<FixedSizeBinaryArray>()
+                .unwrap();
+            Ok(Arc::new(take_fixed_size_binary(values, indices)?))
+        }
         t => unimplemented!("Take not supported for data type {:?}", t),
     }
 }
@@ -760,6 +781,59 @@ where
     Ok(FixedSizeListArray::from(list_data))
 }
 
+fn take_binary<IndexType, OffsetType>(
+    values: &GenericBinaryArray<OffsetType>,
+    indices: &PrimitiveArray<IndexType>,
+) -> Result<GenericBinaryArray<OffsetType>>
+where
+    OffsetType: BinaryOffsetSizeTrait,
+    IndexType: ArrowNumericType,
+    IndexType::Native: ToPrimitive,
+{
+    let data_ref = values.data_ref();
+    let array_iter = indices
+        .values()
+        .iter()
+        .map(|idx| {
+            let idx = maybe_usize::<IndexType>(*idx)?;
+            if data_ref.is_valid(idx) {
+                Ok(Some(values.value(idx)))
+            } else {
+                Ok(None)
+            }
+        })
+        .collect::<Result<Vec<_>>>()?
+        .into_iter();
+
+    Ok(array_iter.collect::<GenericBinaryArray<OffsetType>>())
+}
+
+fn take_fixed_size_binary<IndexType>(
+    values: &FixedSizeBinaryArray,
+    indices: &PrimitiveArray<IndexType>,
+) -> Result<FixedSizeBinaryArray>
+where
+    IndexType: ArrowNumericType,
+    IndexType::Native: ToPrimitive,
+{
+    let data_ref = values.data_ref();
+    let array_iter = indices
+        .values()
+        .iter()
+        .map(|idx| {
+            let idx = maybe_usize::<IndexType>(*idx)?;
+            if data_ref.is_valid(idx) {
+                Ok(Some(values.value(idx)))
+            } else {
+                Ok(None)
+            }
+        })
+        .collect::<Result<Vec<_>>>()?
+        .into_iter();
+
+    FixedSizeBinaryArray::try_from_sparse_iter(array_iter)
+}
+
 /// `take` implementation for dictionary arrays
 ///
 /// applies `take` to the keys of the dictionary array and returns a new 
dictionary array

Reply via email to