This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ebe6f5398 feat: Sort kernel for `RunArray` (#3695)
ebe6f5398 is described below

commit ebe6f539844ba781553c87bdaa2dd25190047c49
Author: askoa <[email protected]>
AuthorDate: Thu Feb 23 04:52:12 2023 -0500

    feat: Sort kernel for `RunArray` (#3695)
    
    * Handle sliced array in run array iterator
    
    * sort_to_indices for RunArray
    
    * better loop
    
    * sort for run array
    
    * improve docs
    
    * some minor tweaks
    
    * doc fix
    
    * format fix
    
    * fix sort run to return all logical indices
    
    * pr comment
    
    * rename test function, pull sort run logic into a separate function
    
    ---------
    
    Co-authored-by: ask <ask@local>
---
 arrow-array/src/array/run_array.rs |  28 +++-
 arrow-array/src/run_iterator.rs    |   9 +-
 arrow-ord/src/sort.rs              | 331 +++++++++++++++++++++++++++++++++++++
 arrow/benches/sort_kernel.rs       |  21 ++-
 4 files changed, 374 insertions(+), 15 deletions(-)

diff --git a/arrow-array/src/array/run_array.rs 
b/arrow-array/src/array/run_array.rs
index 709933e1b..9dba3ddab 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -112,17 +112,37 @@ impl<R: RunEndIndexType> RunArray<R> {
 
     /// Returns a reference to run_ends array
     ///
-    /// Note: any slicing of this array is not applied to the returned array
+    /// Note: any slicing of this [`RunArray`] array is not applied to the 
returned array
     /// and must be handled separately
     pub fn run_ends(&self) -> &PrimitiveArray<R> {
         &self.run_ends
     }
 
     /// Returns a reference to values array
+    ///
+    /// Note: any slicing of this [`RunArray`] array is not applied to the 
returned array
+    /// and must be handled separately
     pub fn values(&self) -> &ArrayRef {
         &self.values
     }
 
+    /// Returns the physical index at which the array slice starts.
+    pub fn get_start_physical_index(&self) -> usize {
+        if self.offset() == 0 {
+            return 0;
+        }
+        self.get_zero_offset_physical_index(self.offset()).unwrap()
+    }
+
+    /// Returns the physical index at which the array slice ends.
+    pub fn get_end_physical_index(&self) -> usize {
+        if self.offset() + self.len() == Self::logical_len(&self.run_ends) {
+            return self.run_ends.len() - 1;
+        }
+        self.get_zero_offset_physical_index(self.offset() + self.len() - 1)
+            .unwrap()
+    }
+
     /// Downcast this [`RunArray`] to a [`TypedRunArray`]
     ///
     /// ```
@@ -230,11 +250,7 @@ impl<R: RunEndIndexType> RunArray<R> {
         }
 
         // Skip some physical indices based on offset.
-        let skip_value = if self.offset() > 0 {
-            self.get_zero_offset_physical_index(self.offset()).unwrap()
-        } else {
-            0
-        };
+        let skip_value = self.get_start_physical_index();
 
         let mut physical_indices = vec![0; indices_len];
 
diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs
index a79969c3c..fbf173b1d 100644
--- a/arrow-array/src/run_iterator.rs
+++ b/arrow-array/src/run_iterator.rs
@@ -57,13 +57,8 @@ where
 {
     /// create a new iterator
     pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
-        let current_front_physical: usize =
-            array.run_array().get_physical_index(0).unwrap();
-        let current_back_physical: usize = array
-            .run_array()
-            .get_physical_index(array.len() - 1)
-            .unwrap()
-            + 1;
+        let current_front_physical = 
array.run_array().get_start_physical_index();
+        let current_back_physical = array.run_array().get_end_physical_index() 
+ 1;
         RunArrayIter {
             array,
             current_front_logical: array.offset(),
diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs
index 207f499ef..c4baa2283 100644
--- a/arrow-ord/src/sort.rs
+++ b/arrow-ord/src/sort.rs
@@ -18,14 +18,17 @@
 //! Defines sort kernel for `ArrayRef`
 
 use crate::ord::{build_compare, DynComparator};
+use arrow_array::builder::BufferBuilder;
 use arrow_array::cast::*;
 use arrow_array::types::*;
 use arrow_array::*;
 use arrow_buffer::{ArrowNativeType, MutableBuffer};
 use arrow_data::ArrayData;
+use arrow_data::ArrayDataBuilder;
 use arrow_schema::{ArrowError, DataType, IntervalUnit, TimeUnit};
 use arrow_select::take::take;
 use std::cmp::Ordering;
+use std::sync::Arc;
 
 pub use arrow_schema::SortOptions;
 
@@ -55,6 +58,9 @@ pub fn sort(
     values: &ArrayRef,
     options: Option<SortOptions>,
 ) -> Result<ArrayRef, ArrowError> {
+    if let DataType::RunEndEncoded(_, _) = values.data_type() {
+        return sort_run(values, options, None);
+    }
     let indices = sort_to_indices(values, options, None)?;
     take(values.as_ref(), &indices, None)
 }
@@ -94,6 +100,9 @@ pub fn sort_limit(
     options: Option<SortOptions>,
     limit: Option<usize>,
 ) -> Result<ArrayRef, ArrowError> {
+    if let DataType::RunEndEncoded(_, _) = values.data_type() {
+        return sort_run(values, options, limit);
+    }
     let indices = sort_to_indices(values, options, limit)?;
     take(values.as_ref(), &indices, None)
 }
@@ -357,6 +366,16 @@ pub fn sort_to_indices(
             sort_binary::<i32>(values, v, n, &options, limit)
         }
         DataType::LargeBinary => sort_binary::<i64>(values, v, n, &options, 
limit),
+        DataType::RunEndEncoded(run_ends_field, _) => match 
run_ends_field.data_type() {
+            DataType::Int16 => sort_run_to_indices::<Int16Type>(values, 
&options, limit),
+            DataType::Int32 => sort_run_to_indices::<Int32Type>(values, 
&options, limit),
+            DataType::Int64 => sort_run_to_indices::<Int64Type>(values, 
&options, limit),
+            dt => {
+                return Err(ArrowError::ComputeError(format!(
+                    "Inavlid run end data type: {dt}"
+                )))
+            }
+        },
         t => {
             return Err(ArrowError::ComputeError(format!(
                 "Sort not supported for data type {t:?}"
@@ -599,6 +618,194 @@ fn insert_valid_values<T>(result_slice: &mut [u32], 
offset: usize, valids: &[(u3
     append_valids(&mut result_slice[offset..offset + valids.len()]);
 }
 
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    match values.data_type() {
+        DataType::RunEndEncoded(run_ends_field, _) => match 
run_ends_field.data_type() {
+            DataType::Int16 => sort_run_downcasted::<Int16Type>(values, 
options, limit),
+            DataType::Int32 => sort_run_downcasted::<Int32Type>(values, 
options, limit),
+            DataType::Int64 => sort_run_downcasted::<Int64Type>(values, 
options, limit),
+            dt => unreachable!("Not valid run ends data type {dt}"),
+        },
+        dt => Err(ArrowError::InvalidArgumentError(format!(
+            "Input is not a run encoded array. Input data type {dt}"
+        ))),
+    }
+}
+
+fn sort_run_downcasted<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: Option<SortOptions>,
+    limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+    // Determine the length of output run array.
+    let output_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+
+    let run_ends = run_array.run_ends();
+
+    let mut new_run_ends_builder = 
BufferBuilder::<R::Native>::new(run_ends.len());
+    let mut new_run_end: usize = 0;
+    let mut new_physical_len: usize = 0;
+
+    let consume_runs = |run_length, _| {
+        new_run_end += run_length;
+        new_physical_len += 1;
+        
new_run_ends_builder.append(R::Native::from_usize(new_run_end).unwrap());
+    };
+
+    let (values_indices, run_values) =
+        sort_run_inner(run_array, options, output_len, consume_runs);
+
+    let new_run_ends = unsafe {
+        // Safety:
+        // The function builds a valid run_ends array and hence need not be 
validated.
+        ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+            .len(new_physical_len)
+            .null_count(0)
+            .add_buffer(new_run_ends_builder.finish())
+            .build_unchecked()
+    };
+
+    // slice the sorted value indices based on limit.
+    let new_values_indices: PrimitiveArray<UInt32Type> = values_indices
+        .slice(0, new_run_ends.len())
+        .into_data()
+        .into();
+
+    let new_values = take(&run_values, &new_values_indices, None)?;
+
+    // Build sorted run array
+    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
+        .len(new_run_end)
+        .add_child_data(new_run_ends)
+        .add_child_data(new_values.into_data());
+    let array_data: RunArray<R> = unsafe {
+        // Safety:
+        //  This function builds a valid run array and hence can skip 
validation.
+        builder.build_unchecked().into()
+    };
+    Ok(Arc::new(array_data))
+}
+
+// Sort to indices for run encoded array.
+// This function will be slow for run array as it decodes the physical indices 
to
+// logical indices and to get the run array back, the logical indices has to be
+// encoded back to run array.
+fn sort_run_to_indices<R: RunEndIndexType>(
+    values: &ArrayRef,
+    options: &SortOptions,
+    limit: Option<usize>,
+) -> UInt32Array {
+    let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+    let output_len = if let Some(limit) = limit {
+        limit.min(run_array.len())
+    } else {
+        run_array.len()
+    };
+    let mut result: Vec<u32> = Vec::with_capacity(output_len);
+
+    //Add all logical indices belonging to a physical index to the output
+    let consume_runs = |run_length, logical_start| {
+        result.extend(logical_start as u32..(logical_start + run_length) as 
u32);
+    };
+    sort_run_inner(run_array, Some(*options), output_len, consume_runs);
+
+    UInt32Array::from(result)
+}
+
+fn sort_run_inner<R: RunEndIndexType, F>(
+    run_array: &RunArray<R>,
+    options: Option<SortOptions>,
+    output_len: usize,
+    mut consume_runs: F,
+) -> (PrimitiveArray<UInt32Type>, ArrayRef)
+where
+    F: FnMut(usize, usize),
+{
+    // slice the run_array.values based on offset and length.
+    let start_physical_index = run_array.get_start_physical_index();
+    let end_physical_index = run_array.get_end_physical_index();
+    let physical_len = end_physical_index - start_physical_index + 1;
+    let run_values = run_array.values().slice(start_physical_index, 
physical_len);
+
+    // All the values have to be sorted irrespective of input limit.
+    let values_indices = sort_to_indices(&run_values, options, None).unwrap();
+
+    let mut remaining_len = output_len;
+
+    let run_ends = run_array.run_ends();
+
+    assert_eq!(
+        0,
+        values_indices.null_count(),
+        "The output of sort_to_indices should not have null values. Its values 
is {}",
+        values_indices.null_count()
+    );
+
+    // Calculate `run length` of sorted value indices.
+    // Find the `logical index` at which the run starts.
+    // Call the consumer using the run length and starting logical index.
+    for physical_index in values_indices.values() {
+        // As the values were sliced with offset = start_physical_index, it 
has to be added back
+        // before accesing `RunArray::run_ends`
+        let physical_index = *physical_index as usize + start_physical_index;
+
+        // calculate the run length and logical index of sorted values
+        let (run_length, logical_index_start) = unsafe {
+            // Safety:
+            // The index will be within bounds as its in bounds of 
start_physical_index
+            // and len, both of which are within bounds of run_array
+            if physical_index == start_physical_index {
+                (
+                    run_ends.value_unchecked(physical_index).as_usize()
+                        - run_array.offset(),
+                    0,
+                )
+            } else if physical_index == end_physical_index {
+                let prev_run_end =
+                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                (
+                    run_array.offset() + run_array.len() - prev_run_end,
+                    prev_run_end - run_array.offset(),
+                )
+            } else {
+                let prev_run_end =
+                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                (
+                    run_ends.value_unchecked(physical_index).as_usize() - 
prev_run_end,
+                    prev_run_end - run_array.offset(),
+                )
+            }
+        };
+        let new_run_length = run_length.min(remaining_len);
+        consume_runs(new_run_length, logical_index_start);
+        remaining_len -= new_run_length;
+
+        if remaining_len == 0 {
+            break;
+        }
+    }
+
+    if remaining_len > 0 {
+        panic!("Remaining length should be zero its values is {remaining_len}")
+    }
+    (values_indices, run_values)
+}
+
 /// Sort strings
 fn sort_string<Offset: OffsetSizeTrait>(
     values: &ArrayRef,
@@ -1057,6 +1264,7 @@ fn sort_valids_array<T>(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use arrow_array::builder::PrimitiveRunBuilder;
     use arrow_buffer::i256;
     use rand::rngs::StdRng;
     use rand::{Rng, RngCore, SeedableRng};
@@ -2882,6 +3090,129 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_sort_run_to_run() {
+        test_sort_run_inner(|array, sort_options, limit| {
+            sort_run(array, sort_options, limit)
+        });
+    }
+
+    #[test]
+    fn test_sort_run_to_indices() {
+        test_sort_run_inner(|array, sort_options, limit| {
+            let indices = sort_to_indices(array, sort_options, limit).unwrap();
+            take(array, &indices, None)
+        });
+    }
+
+    fn test_sort_run_inner<F>(sort_fn: F)
+    where
+        F: Fn(
+            &ArrayRef,
+            Option<SortOptions>,
+            Option<usize>,
+        ) -> Result<ArrayRef, ArrowError>,
+    {
+        // Create an input array for testing
+        let total_len = 80;
+        let vals: Vec<Option<i32>> =
+            vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
+        let repeats: Vec<usize> = vec![1, 3, 2, 4];
+        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
+        for ix in 0_usize..32 {
+            let repeat: usize = repeats[ix % repeats.len()];
+            let val: Option<i32> = vals[ix % vals.len()];
+            input_array.resize(input_array.len() + repeat, val);
+        }
+
+        // create run array using input_array
+        // Encode the input_array to run array
+        let mut builder =
+            PrimitiveRunBuilder::<Int16Type, 
Int32Type>::with_capacity(input_array.len());
+        builder.extend(input_array.iter().copied());
+        let run_array = builder.finish();
+
+        // slice lengths that are tested
+        let slice_lens = [
+            1, 2, 3, 4, 5, 6, 7, 37, 38, 39, 40, 41, 42, 43, 74, 75, 76, 77, 
78, 79, 80,
+        ];
+        for slice_len in slice_lens {
+            test_sort_run_inner2(
+                input_array.as_slice(),
+                &run_array,
+                0,
+                slice_len,
+                None,
+                &sort_fn,
+            );
+            test_sort_run_inner2(
+                input_array.as_slice(),
+                &run_array,
+                total_len - slice_len,
+                slice_len,
+                None,
+                &sort_fn,
+            );
+            // Test with non zero limit
+            if slice_len > 1 {
+                test_sort_run_inner2(
+                    input_array.as_slice(),
+                    &run_array,
+                    0,
+                    slice_len,
+                    Some(slice_len / 2),
+                    &sort_fn,
+                );
+                test_sort_run_inner2(
+                    input_array.as_slice(),
+                    &run_array,
+                    total_len - slice_len,
+                    slice_len,
+                    Some(slice_len / 2),
+                    &sort_fn,
+                );
+            }
+        }
+    }
+
+    fn test_sort_run_inner2<F>(
+        input_array: &[Option<i32>],
+        run_array: &RunArray<Int16Type>,
+        offset: usize,
+        length: usize,
+        limit: Option<usize>,
+        sort_fn: &F,
+    ) where
+        F: Fn(
+            &ArrayRef,
+            Option<SortOptions>,
+            Option<usize>,
+        ) -> Result<ArrayRef, ArrowError>,
+    {
+        // Run the sort and build actual result
+        let sliced_array = run_array.slice(offset, length);
+        let sorted_sliced_array = sort_fn(&sliced_array, None, limit).unwrap();
+        let sorted_run_array = sorted_sliced_array
+            .as_any()
+            .downcast_ref::<RunArray<Int16Type>>()
+            .unwrap();
+        let typed_run_array = sorted_run_array
+            .downcast::<PrimitiveArray<Int32Type>>()
+            .unwrap();
+        let actual: Vec<Option<i32>> = typed_run_array.into_iter().collect();
+
+        // build expected result.
+        let mut sliced_input = input_array[offset..(offset + 
length)].to_owned();
+        sliced_input.sort();
+        let expected = if let Some(limit) = limit {
+            sliced_input.iter().take(limit).copied().collect()
+        } else {
+            sliced_input
+        };
+
+        assert_eq!(expected, actual)
+    }
+
     #[test]
     fn test_sort_string_dicts() {
         test_sort_string_dict_arrays::<Int8Type>(
diff --git a/arrow/benches/sort_kernel.rs b/arrow/benches/sort_kernel.rs
index c4c6819df..43a9a84d9 100644
--- a/arrow/benches/sort_kernel.rs
+++ b/arrow/benches/sort_kernel.rs
@@ -24,8 +24,8 @@ use std::sync::Arc;
 extern crate arrow;
 
 use arrow::compute::kernels::sort::{lexsort, SortColumn};
-use arrow::compute::sort_to_indices;
-use arrow::datatypes::Int32Type;
+use arrow::compute::{sort_limit, sort_to_indices};
+use arrow::datatypes::{Int16Type, Int32Type};
 use arrow::util::bench_util::*;
 use arrow::{array::*, datatypes::Float32Type};
 
@@ -61,6 +61,10 @@ fn bench_sort_to_indices(array: &ArrayRef, limit: 
Option<usize>) {
     criterion::black_box(sort_to_indices(array, None, limit).unwrap());
 }
 
+fn bench_sort_run(array: &ArrayRef, limit: Option<usize>) {
+    criterion::black_box(sort_limit(array, None, limit).unwrap());
+}
+
 fn add_benchmark(c: &mut Criterion) {
     let arr_a = create_f32_array(2u64.pow(10) as usize, false);
     let arr_b = create_f32_array(2u64.pow(10) as usize, false);
@@ -107,6 +111,19 @@ fn add_benchmark(c: &mut Criterion) {
         b.iter(|| bench_sort_to_indices(&dict_arr, None))
     });
 
+    let run_encoded_array = Arc::new(create_primitive_run_array::<Int16Type, 
Int32Type>(
+        2u64.pow(12) as usize,
+        2u64.pow(10) as usize,
+    )) as ArrayRef;
+
+    c.bench_function("sort primitive run to indices 2^12", |b| {
+        b.iter(|| bench_sort_to_indices(&run_encoded_array, None))
+    });
+
+    c.bench_function("sort primitive run to run 2^12", |b| {
+        b.iter(|| bench_sort_run(&run_encoded_array, None))
+    });
+
     // with limit
     {
         let arr_a = create_f32_array(2u64.pow(12) as usize, false);

Reply via email to