This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ebe6f5398 feat: Sort kernel for `RunArray` (#3695)
ebe6f5398 is described below
commit ebe6f539844ba781553c87bdaa2dd25190047c49
Author: askoa <[email protected]>
AuthorDate: Thu Feb 23 04:52:12 2023 -0500
feat: Sort kernel for `RunArray` (#3695)
* Handle sliced array in run array iterator
* sort_to_indices for RunArray
* better loop
* sort for run array
* improve docs
* some minor tweaks
* doc fix
* format fix
* fix sort run to return all logical indices
* pr comment
* rename test function, pull sort run logic into a separate function
---------
Co-authored-by: ask <ask@local>
---
arrow-array/src/array/run_array.rs | 28 +++-
arrow-array/src/run_iterator.rs | 9 +-
arrow-ord/src/sort.rs | 331 +++++++++++++++++++++++++++++++++++++
arrow/benches/sort_kernel.rs | 21 ++-
4 files changed, 374 insertions(+), 15 deletions(-)
diff --git a/arrow-array/src/array/run_array.rs
b/arrow-array/src/array/run_array.rs
index 709933e1b..9dba3ddab 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -112,17 +112,37 @@ impl<R: RunEndIndexType> RunArray<R> {
/// Returns a reference to run_ends array
///
- /// Note: any slicing of this array is not applied to the returned array
+ /// Note: any slicing of this [`RunArray`] array is not applied to the
returned array
/// and must be handled separately
pub fn run_ends(&self) -> &PrimitiveArray<R> {
&self.run_ends
}
/// Returns a reference to values array
+ ///
+ /// Note: any slicing of this [`RunArray`] array is not applied to the
returned array
+ /// and must be handled separately
pub fn values(&self) -> &ArrayRef {
&self.values
}
+ /// Returns the physical index at which the array slice starts.
+ pub fn get_start_physical_index(&self) -> usize {
+ if self.offset() == 0 {
+ return 0;
+ }
+ self.get_zero_offset_physical_index(self.offset()).unwrap()
+ }
+
+ /// Returns the physical index at which the array slice ends.
+ pub fn get_end_physical_index(&self) -> usize {
+ if self.offset() + self.len() == Self::logical_len(&self.run_ends) {
+ return self.run_ends.len() - 1;
+ }
+ self.get_zero_offset_physical_index(self.offset() + self.len() - 1)
+ .unwrap()
+ }
+
/// Downcast this [`RunArray`] to a [`TypedRunArray`]
///
/// ```
@@ -230,11 +250,7 @@ impl<R: RunEndIndexType> RunArray<R> {
}
// Skip some physical indices based on offset.
- let skip_value = if self.offset() > 0 {
- self.get_zero_offset_physical_index(self.offset()).unwrap()
- } else {
- 0
- };
+ let skip_value = self.get_start_physical_index();
let mut physical_indices = vec![0; indices_len];
diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs
index a79969c3c..fbf173b1d 100644
--- a/arrow-array/src/run_iterator.rs
+++ b/arrow-array/src/run_iterator.rs
@@ -57,13 +57,8 @@ where
{
/// create a new iterator
pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
- let current_front_physical: usize =
- array.run_array().get_physical_index(0).unwrap();
- let current_back_physical: usize = array
- .run_array()
- .get_physical_index(array.len() - 1)
- .unwrap()
- + 1;
+ let current_front_physical =
array.run_array().get_start_physical_index();
+ let current_back_physical = array.run_array().get_end_physical_index()
+ 1;
RunArrayIter {
array,
current_front_logical: array.offset(),
diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs
index 207f499ef..c4baa2283 100644
--- a/arrow-ord/src/sort.rs
+++ b/arrow-ord/src/sort.rs
@@ -18,14 +18,17 @@
//! Defines sort kernel for `ArrayRef`
use crate::ord::{build_compare, DynComparator};
+use arrow_array::builder::BufferBuilder;
use arrow_array::cast::*;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, MutableBuffer};
use arrow_data::ArrayData;
+use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, IntervalUnit, TimeUnit};
use arrow_select::take::take;
use std::cmp::Ordering;
+use std::sync::Arc;
pub use arrow_schema::SortOptions;
@@ -55,6 +58,9 @@ pub fn sort(
values: &ArrayRef,
options: Option<SortOptions>,
) -> Result<ArrayRef, ArrowError> {
+ if let DataType::RunEndEncoded(_, _) = values.data_type() {
+ return sort_run(values, options, None);
+ }
let indices = sort_to_indices(values, options, None)?;
take(values.as_ref(), &indices, None)
}
@@ -94,6 +100,9 @@ pub fn sort_limit(
options: Option<SortOptions>,
limit: Option<usize>,
) -> Result<ArrayRef, ArrowError> {
+ if let DataType::RunEndEncoded(_, _) = values.data_type() {
+ return sort_run(values, options, limit);
+ }
let indices = sort_to_indices(values, options, limit)?;
take(values.as_ref(), &indices, None)
}
@@ -357,6 +366,16 @@ pub fn sort_to_indices(
sort_binary::<i32>(values, v, n, &options, limit)
}
DataType::LargeBinary => sort_binary::<i64>(values, v, n, &options,
limit),
+ DataType::RunEndEncoded(run_ends_field, _) => match
run_ends_field.data_type() {
+ DataType::Int16 => sort_run_to_indices::<Int16Type>(values,
&options, limit),
+ DataType::Int32 => sort_run_to_indices::<Int32Type>(values,
&options, limit),
+ DataType::Int64 => sort_run_to_indices::<Int64Type>(values,
&options, limit),
+ dt => {
+ return Err(ArrowError::ComputeError(format!(
+ "Inavlid run end data type: {dt}"
+ )))
+ }
+ },
t => {
return Err(ArrowError::ComputeError(format!(
"Sort not supported for data type {t:?}"
@@ -599,6 +618,194 @@ fn insert_valid_values<T>(result_slice: &mut [u32],
offset: usize, valids: &[(u3
append_valids(&mut result_slice[offset..offset + valids.len()]);
}
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(
+ values: &ArrayRef,
+ options: Option<SortOptions>,
+ limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+ match values.data_type() {
+ DataType::RunEndEncoded(run_ends_field, _) => match
run_ends_field.data_type() {
+ DataType::Int16 => sort_run_downcasted::<Int16Type>(values,
options, limit),
+ DataType::Int32 => sort_run_downcasted::<Int32Type>(values,
options, limit),
+ DataType::Int64 => sort_run_downcasted::<Int64Type>(values,
options, limit),
+ dt => unreachable!("Not valid run ends data type {dt}"),
+ },
+ dt => Err(ArrowError::InvalidArgumentError(format!(
+ "Input is not a run encoded array. Input data type {dt}"
+ ))),
+ }
+}
+
+fn sort_run_downcasted<R: RunEndIndexType>(
+ values: &ArrayRef,
+ options: Option<SortOptions>,
+ limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+ let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+ // Determine the length of output run array.
+ let output_len = if let Some(limit) = limit {
+ limit.min(run_array.len())
+ } else {
+ run_array.len()
+ };
+
+ let run_ends = run_array.run_ends();
+
+ let mut new_run_ends_builder =
BufferBuilder::<R::Native>::new(run_ends.len());
+ let mut new_run_end: usize = 0;
+ let mut new_physical_len: usize = 0;
+
+ let consume_runs = |run_length, _| {
+ new_run_end += run_length;
+ new_physical_len += 1;
+
new_run_ends_builder.append(R::Native::from_usize(new_run_end).unwrap());
+ };
+
+ let (values_indices, run_values) =
+ sort_run_inner(run_array, options, output_len, consume_runs);
+
+ let new_run_ends = unsafe {
+ // Safety:
+ // The function builds a valid run_ends array and hence need not be
validated.
+ ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+ .len(new_physical_len)
+ .null_count(0)
+ .add_buffer(new_run_ends_builder.finish())
+ .build_unchecked()
+ };
+
+ // slice the sorted value indices based on limit.
+ let new_values_indices: PrimitiveArray<UInt32Type> = values_indices
+ .slice(0, new_run_ends.len())
+ .into_data()
+ .into();
+
+ let new_values = take(&run_values, &new_values_indices, None)?;
+
+ // Build sorted run array
+ let builder = ArrayDataBuilder::new(run_array.data_type().clone())
+ .len(new_run_end)
+ .add_child_data(new_run_ends)
+ .add_child_data(new_values.into_data());
+ let array_data: RunArray<R> = unsafe {
+ // Safety:
+ // This function builds a valid run array and hence can skip
validation.
+ builder.build_unchecked().into()
+ };
+ Ok(Arc::new(array_data))
+}
+
+// Sort to indices for run encoded array.
+// This function will be slow for run array as it decodes the physical indices
to
+// logical indices and to get the run array back, the logical indices has to be
+// encoded back to run array.
+fn sort_run_to_indices<R: RunEndIndexType>(
+ values: &ArrayRef,
+ options: &SortOptions,
+ limit: Option<usize>,
+) -> UInt32Array {
+ let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+ let output_len = if let Some(limit) = limit {
+ limit.min(run_array.len())
+ } else {
+ run_array.len()
+ };
+ let mut result: Vec<u32> = Vec::with_capacity(output_len);
+
+ //Add all logical indices belonging to a physical index to the output
+ let consume_runs = |run_length, logical_start| {
+ result.extend(logical_start as u32..(logical_start + run_length) as
u32);
+ };
+ sort_run_inner(run_array, Some(*options), output_len, consume_runs);
+
+ UInt32Array::from(result)
+}
+
+fn sort_run_inner<R: RunEndIndexType, F>(
+ run_array: &RunArray<R>,
+ options: Option<SortOptions>,
+ output_len: usize,
+ mut consume_runs: F,
+) -> (PrimitiveArray<UInt32Type>, ArrayRef)
+where
+ F: FnMut(usize, usize),
+{
+ // slice the run_array.values based on offset and length.
+ let start_physical_index = run_array.get_start_physical_index();
+ let end_physical_index = run_array.get_end_physical_index();
+ let physical_len = end_physical_index - start_physical_index + 1;
+ let run_values = run_array.values().slice(start_physical_index,
physical_len);
+
+ // All the values have to be sorted irrespective of input limit.
+ let values_indices = sort_to_indices(&run_values, options, None).unwrap();
+
+ let mut remaining_len = output_len;
+
+ let run_ends = run_array.run_ends();
+
+ assert_eq!(
+ 0,
+ values_indices.null_count(),
+ "The output of sort_to_indices should not have null values. Its values
is {}",
+ values_indices.null_count()
+ );
+
+ // Calculate `run length` of sorted value indices.
+ // Find the `logical index` at which the run starts.
+ // Call the consumer using the run length and starting logical index.
+ for physical_index in values_indices.values() {
+ // As the values were sliced with offset = start_physical_index, it
has to be added back
+ // before accesing `RunArray::run_ends`
+ let physical_index = *physical_index as usize + start_physical_index;
+
+ // calculate the run length and logical index of sorted values
+ let (run_length, logical_index_start) = unsafe {
+ // Safety:
+ // The index will be within bounds as its in bounds of
start_physical_index
+ // and len, both of which are within bounds of run_array
+ if physical_index == start_physical_index {
+ (
+ run_ends.value_unchecked(physical_index).as_usize()
+ - run_array.offset(),
+ 0,
+ )
+ } else if physical_index == end_physical_index {
+ let prev_run_end =
+ run_ends.value_unchecked(physical_index - 1).as_usize();
+ (
+ run_array.offset() + run_array.len() - prev_run_end,
+ prev_run_end - run_array.offset(),
+ )
+ } else {
+ let prev_run_end =
+ run_ends.value_unchecked(physical_index - 1).as_usize();
+ (
+ run_ends.value_unchecked(physical_index).as_usize() -
prev_run_end,
+ prev_run_end - run_array.offset(),
+ )
+ }
+ };
+ let new_run_length = run_length.min(remaining_len);
+ consume_runs(new_run_length, logical_index_start);
+ remaining_len -= new_run_length;
+
+ if remaining_len == 0 {
+ break;
+ }
+ }
+
+ if remaining_len > 0 {
+ panic!("Remaining length should be zero its values is {remaining_len}")
+ }
+ (values_indices, run_values)
+}
+
/// Sort strings
fn sort_string<Offset: OffsetSizeTrait>(
values: &ArrayRef,
@@ -1057,6 +1264,7 @@ fn sort_valids_array<T>(
#[cfg(test)]
mod tests {
use super::*;
+ use arrow_array::builder::PrimitiveRunBuilder;
use arrow_buffer::i256;
use rand::rngs::StdRng;
use rand::{Rng, RngCore, SeedableRng};
@@ -2882,6 +3090,129 @@ mod tests {
);
}
+ #[test]
+ fn test_sort_run_to_run() {
+ test_sort_run_inner(|array, sort_options, limit| {
+ sort_run(array, sort_options, limit)
+ });
+ }
+
+ #[test]
+ fn test_sort_run_to_indices() {
+ test_sort_run_inner(|array, sort_options, limit| {
+ let indices = sort_to_indices(array, sort_options, limit).unwrap();
+ take(array, &indices, None)
+ });
+ }
+
+ fn test_sort_run_inner<F>(sort_fn: F)
+ where
+ F: Fn(
+ &ArrayRef,
+ Option<SortOptions>,
+ Option<usize>,
+ ) -> Result<ArrayRef, ArrowError>,
+ {
+ // Create an input array for testing
+ let total_len = 80;
+ let vals: Vec<Option<i32>> =
+ vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
+ let repeats: Vec<usize> = vec![1, 3, 2, 4];
+ let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
+ for ix in 0_usize..32 {
+ let repeat: usize = repeats[ix % repeats.len()];
+ let val: Option<i32> = vals[ix % vals.len()];
+ input_array.resize(input_array.len() + repeat, val);
+ }
+
+ // create run array using input_array
+ // Encode the input_array to run array
+ let mut builder =
+ PrimitiveRunBuilder::<Int16Type,
Int32Type>::with_capacity(input_array.len());
+ builder.extend(input_array.iter().copied());
+ let run_array = builder.finish();
+
+ // slice lengths that are tested
+ let slice_lens = [
+ 1, 2, 3, 4, 5, 6, 7, 37, 38, 39, 40, 41, 42, 43, 74, 75, 76, 77,
78, 79, 80,
+ ];
+ for slice_len in slice_lens {
+ test_sort_run_inner2(
+ input_array.as_slice(),
+ &run_array,
+ 0,
+ slice_len,
+ None,
+ &sort_fn,
+ );
+ test_sort_run_inner2(
+ input_array.as_slice(),
+ &run_array,
+ total_len - slice_len,
+ slice_len,
+ None,
+ &sort_fn,
+ );
+ // Test with non zero limit
+ if slice_len > 1 {
+ test_sort_run_inner2(
+ input_array.as_slice(),
+ &run_array,
+ 0,
+ slice_len,
+ Some(slice_len / 2),
+ &sort_fn,
+ );
+ test_sort_run_inner2(
+ input_array.as_slice(),
+ &run_array,
+ total_len - slice_len,
+ slice_len,
+ Some(slice_len / 2),
+ &sort_fn,
+ );
+ }
+ }
+ }
+
+ fn test_sort_run_inner2<F>(
+ input_array: &[Option<i32>],
+ run_array: &RunArray<Int16Type>,
+ offset: usize,
+ length: usize,
+ limit: Option<usize>,
+ sort_fn: &F,
+ ) where
+ F: Fn(
+ &ArrayRef,
+ Option<SortOptions>,
+ Option<usize>,
+ ) -> Result<ArrayRef, ArrowError>,
+ {
+ // Run the sort and build actual result
+ let sliced_array = run_array.slice(offset, length);
+ let sorted_sliced_array = sort_fn(&sliced_array, None, limit).unwrap();
+ let sorted_run_array = sorted_sliced_array
+ .as_any()
+ .downcast_ref::<RunArray<Int16Type>>()
+ .unwrap();
+ let typed_run_array = sorted_run_array
+ .downcast::<PrimitiveArray<Int32Type>>()
+ .unwrap();
+ let actual: Vec<Option<i32>> = typed_run_array.into_iter().collect();
+
+ // build expected result.
+ let mut sliced_input = input_array[offset..(offset +
length)].to_owned();
+ sliced_input.sort();
+ let expected = if let Some(limit) = limit {
+ sliced_input.iter().take(limit).copied().collect()
+ } else {
+ sliced_input
+ };
+
+ assert_eq!(expected, actual)
+ }
+
#[test]
fn test_sort_string_dicts() {
test_sort_string_dict_arrays::<Int8Type>(
diff --git a/arrow/benches/sort_kernel.rs b/arrow/benches/sort_kernel.rs
index c4c6819df..43a9a84d9 100644
--- a/arrow/benches/sort_kernel.rs
+++ b/arrow/benches/sort_kernel.rs
@@ -24,8 +24,8 @@ use std::sync::Arc;
extern crate arrow;
use arrow::compute::kernels::sort::{lexsort, SortColumn};
-use arrow::compute::sort_to_indices;
-use arrow::datatypes::Int32Type;
+use arrow::compute::{sort_limit, sort_to_indices};
+use arrow::datatypes::{Int16Type, Int32Type};
use arrow::util::bench_util::*;
use arrow::{array::*, datatypes::Float32Type};
@@ -61,6 +61,10 @@ fn bench_sort_to_indices(array: &ArrayRef, limit:
Option<usize>) {
criterion::black_box(sort_to_indices(array, None, limit).unwrap());
}
+fn bench_sort_run(array: &ArrayRef, limit: Option<usize>) {
+ criterion::black_box(sort_limit(array, None, limit).unwrap());
+}
+
fn add_benchmark(c: &mut Criterion) {
let arr_a = create_f32_array(2u64.pow(10) as usize, false);
let arr_b = create_f32_array(2u64.pow(10) as usize, false);
@@ -107,6 +111,19 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_sort_to_indices(&dict_arr, None))
});
+ let run_encoded_array = Arc::new(create_primitive_run_array::<Int16Type,
Int32Type>(
+ 2u64.pow(12) as usize,
+ 2u64.pow(10) as usize,
+ )) as ArrayRef;
+
+ c.bench_function("sort primitive run to indices 2^12", |b| {
+ b.iter(|| bench_sort_to_indices(&run_encoded_array, None))
+ });
+
+ c.bench_function("sort primitive run to run 2^12", |b| {
+ b.iter(|| bench_sort_run(&run_encoded_array, None))
+ });
+
// with limit
{
let arr_a = create_f32_array(2u64.pow(12) as usize, false);