This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 36f2db3a3 Add RunEndBuffer (#1799) (#3817)
36f2db3a3 is described below

commit 36f2db3a35e07dfbfdb6b32e457d40ef8ccfb601
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Mar 8 20:17:31 2023 +0100

    Add RunEndBuffer (#1799) (#3817)
    
    * Add RunEndBuffer (#1799)
    
    * Fix test
    
    * Revert rename
    
    * Format
    
    * Clippy
    
    * Remove unnecessary check
    
    * Fix
    
    * Tweak docs
    
    * Add docs
---
 arrow-array/src/array/mod.rs                       |   1 +
 arrow-array/src/array/run_array.rs                 | 138 +++++---------
 .../src/builder/generic_byte_run_builder.rs        |  37 +---
 arrow-array/src/builder/primitive_run_builder.rs   |  17 +-
 arrow-array/src/run_iterator.rs                    |  19 +-
 arrow-buffer/src/buffer/mod.rs                     |   2 +
 arrow-buffer/src/buffer/run.rs                     | 200 +++++++++++++++++++++
 arrow-ipc/src/writer.rs                            |  32 ++--
 arrow-ord/src/sort.rs                              |  14 +-
 arrow-select/src/take.rs                           |   3 +-
 10 files changed, 286 insertions(+), 177 deletions(-)

diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs
index f3c35e51f..dfdaac85b 100644
--- a/arrow-array/src/array/mod.rs
+++ b/arrow-array/src/array/mod.rs
@@ -880,6 +880,7 @@ mod tests {
                     assert_eq!(array.null_count(), 0);
                     assert_eq!(array.values().len(), 1);
                     assert_eq!(array.values().null_count(), 1);
+                    assert_eq!(array.run_ends().len(), 4);
                     assert_eq!(array.run_ends().values(), &[4]);
 
                     let idx = array.get_physical_indices(&[0, 1, 2, 
3]).unwrap();
diff --git a/arrow-array/src/array/run_array.rs 
b/arrow-array/src/array/run_array.rs
index 126aefde9..e50903f30 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -17,6 +17,7 @@
 
 use std::any::Any;
 
+use arrow_buffer::buffer::RunEndBuffer;
 use arrow_buffer::ArrowNativeType;
 use arrow_data::{ArrayData, ArrayDataBuilder};
 use arrow_schema::{ArrowError, DataType, Field};
@@ -62,7 +63,7 @@ use crate::{
 
 pub struct RunArray<R: RunEndIndexType> {
     data: ArrayData,
-    run_ends: PrimitiveArray<R>,
+    run_ends: RunEndBuffer<R::Native>,
     values: ArrayRef,
 }
 
@@ -110,11 +111,8 @@ impl<R: RunEndIndexType> RunArray<R> {
         Ok(array_data.into())
     }
 
-    /// Returns a reference to run_ends array
-    ///
-    /// Note: any slicing of this [`RunArray`] array is not applied to the 
returned array
-    /// and must be handled separately
-    pub fn run_ends(&self) -> &PrimitiveArray<R> {
+    /// Returns a reference to [`RunEndBuffer`]
+    pub fn run_ends(&self) -> &RunEndBuffer<R::Native> {
         &self.run_ends
     }
 
@@ -128,19 +126,12 @@ impl<R: RunEndIndexType> RunArray<R> {
 
     /// Returns the physical index at which the array slice starts.
     pub fn get_start_physical_index(&self) -> usize {
-        if self.offset() == 0 {
-            return 0;
-        }
-        self.get_zero_offset_physical_index(self.offset()).unwrap()
+        self.run_ends.get_start_physical_index()
     }
 
     /// Returns the physical index at which the array slice ends.
     pub fn get_end_physical_index(&self) -> usize {
-        if self.offset() + self.len() == Self::logical_len(&self.run_ends) {
-            return self.run_ends.len() - 1;
-        }
-        self.get_zero_offset_physical_index(self.offset() + self.len() - 1)
-            .unwrap()
+        self.run_ends.get_end_physical_index()
     }
 
     /// Downcast this [`RunArray`] to a [`TypedRunArray`]
@@ -164,47 +155,13 @@ impl<R: RunEndIndexType> RunArray<R> {
         })
     }
 
-    /// Returns index to the physical array for the given index to the logical 
array.
-    /// The function does not adjust the input logical index based on 
`ArrayData::offset`.
-    /// Performs a binary search on the run_ends array for the input index.
-    #[inline]
-    pub fn get_zero_offset_physical_index(&self, logical_index: usize) -> 
Option<usize> {
-        if logical_index >= Self::logical_len(&self.run_ends) {
-            return None;
-        }
-        let mut st: usize = 0;
-        let mut en: usize = self.run_ends.len();
-        while st + 1 < en {
-            let mid: usize = (st + en) / 2;
-            if logical_index
-                < unsafe {
-                    // Safety:
-                    // The value of mid will always be between 1 and len - 1,
-                    // where len is length of run ends array.
-                    // This is based on the fact that `st` starts with 0 and
-                    // `en` starts with len. The condition `st + 1 < en` 
ensures
-                    // `st` and `en` differs atleast by two. So the value of 
`mid`
-                    // will never be either `st` or `en`
-                    self.run_ends.value_unchecked(mid - 1).as_usize()
-                }
-            {
-                en = mid
-            } else {
-                st = mid
-            }
-        }
-        Some(st)
-    }
-
     /// Returns index to the physical array for the given index to the logical 
array.
     /// This function adjusts the input logical index based on 
`ArrayData::offset`
     /// Performs a binary search on the run_ends array for the input index.
-    #[inline]
-    pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
-        if logical_index >= self.len() {
-            return None;
-        }
-        self.get_zero_offset_physical_index(logical_index + self.offset())
+    ///
+    /// The result is arbitrary if `logical_index >= self.len()`
+    pub fn get_physical_index(&self, logical_index: usize) -> usize {
+        self.run_ends.get_physical_index(logical_index)
     }
 
     /// Returns the physical indices of the input logical indices. Returns 
error if any of the logical
@@ -222,6 +179,9 @@ impl<R: RunEndIndexType> RunArray<R> {
     where
         I: ArrowNativeType,
     {
+        let len = self.run_ends().len();
+        let offset = self.run_ends().offset();
+
         let indices_len = logical_indices.len();
 
         if indices_len == 0 {
@@ -243,7 +203,7 @@ impl<R: RunEndIndexType> RunArray<R> {
         // Return early if all the logical indices cannot be converted to 
physical indices.
         let largest_logical_index =
             logical_indices[*ordered_indices.last().unwrap()].as_usize();
-        if largest_logical_index >= self.len() {
+        if largest_logical_index >= len {
             return Err(ArrowError::InvalidArgumentError(format!(
                 "Cannot convert all logical indices to physical indices. The 
logical index cannot be converted is {largest_logical_index}.",
             )));
@@ -259,7 +219,7 @@ impl<R: RunEndIndexType> RunArray<R> {
             self.run_ends.values().iter().enumerate().skip(skip_value)
         {
             // Get the run end index (relative to offset) of current physical 
index
-            let run_end_value = run_end.as_usize() - self.offset();
+            let run_end_value = run_end.as_usize() - offset;
 
             // All the `logical_indices` that are less than current run end 
index
             // belongs to current physical index.
@@ -295,7 +255,15 @@ impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
             }
         }
 
-        let run_ends = PrimitiveArray::<R>::from(data.child_data()[0].clone());
+        // Safety
+        // ArrayData is valid
+        let child = &data.child_data()[0];
+        assert_eq!(child.data_type(), &R::DATA_TYPE, "Incorrect run ends 
type");
+        let run_ends = unsafe {
+            let scalar = child.buffers()[0].clone().into();
+            RunEndBuffer::new_unchecked(scalar, data.offset(), data.len())
+        };
+
         let values = make_array(data.child_data()[1].clone());
         Self {
             data,
@@ -330,7 +298,8 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
         writeln!(
             f,
             "RunArray {{run_ends: {:?}, values: {:?}}}",
-            self.run_ends, self.values
+            self.run_ends.values(),
+            self.values
         )
     }
 }
@@ -347,7 +316,7 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
 ///     .map(|&x| if x == "b" { None } else { Some(x) })
 ///     .collect();
 /// assert_eq!(
-///     "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  5,\n], 
values: StringArray\n[\n  \"a\",\n  null,\n  \"c\",\n]}\n",
+///     "RunArray {run_ends: [2, 3, 5], values: StringArray\n[\n  \"a\",\n  
null,\n  \"c\",\n]}\n",
 ///     format!("{:?}", array)
 /// );
 /// ```
@@ -374,7 +343,7 @@ impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>> 
for RunArray<T> {
 /// let test = vec!["a", "a", "b", "c"];
 /// let array: RunArray<Int16Type> = test.into_iter().collect();
 /// assert_eq!(
-///     "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  4,\n], 
values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
+///     "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n  
\"b\",\n  \"c\",\n]}\n",
 ///     format!("{:?}", array)
 /// );
 /// ```
@@ -401,7 +370,7 @@ impl<'a, T: RunEndIndexType> FromIterator<&'a str> for 
RunArray<T> {
 ///
 /// let array: Int16RunArray = vec!["a", "a", "b", "c", 
"c"].into_iter().collect();
 /// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", 
"c"]));
-/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
 /// assert_eq!(array.values(), &values);
 /// ```
 pub type Int16RunArray = RunArray<Int16Type>;
@@ -416,7 +385,7 @@ pub type Int16RunArray = RunArray<Int16Type>;
 ///
 /// let array: Int32RunArray = vec!["a", "a", "b", "c", 
"c"].into_iter().collect();
 /// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", 
"c"]));
-/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
 /// assert_eq!(array.values(), &values);
 /// ```
 pub type Int32RunArray = RunArray<Int32Type>;
@@ -431,7 +400,7 @@ pub type Int32RunArray = RunArray<Int32Type>;
 ///
 /// let array: Int64RunArray = vec!["a", "a", "b", "c", 
"c"].into_iter().collect();
 /// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", 
"c"]));
-/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
 /// assert_eq!(array.values(), &values);
 /// ```
 pub type Int64RunArray = RunArray<Int64Type>;
@@ -480,7 +449,7 @@ impl<'a, R: RunEndIndexType, V> std::fmt::Debug for 
TypedRunArray<'a, R, V> {
 
 impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
     /// Returns the run_ends of this [`TypedRunArray`]
-    pub fn run_ends(&self) -> &'a PrimitiveArray<R> {
+    pub fn run_ends(&self) -> &'a RunEndBuffer<R::Native> {
         self.run_array.run_ends()
     }
 
@@ -531,7 +500,7 @@ where
     }
 
     unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item {
-        let physical_index = 
self.run_array.get_physical_index(logical_index).unwrap();
+        let physical_index = self.run_array.get_physical_index(logical_index);
         self.values().value_unchecked(physical_index)
     }
 }
@@ -563,7 +532,7 @@ mod tests {
     use crate::builder::PrimitiveRunBuilder;
     use crate::cast::as_primitive_array;
     use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
-    use crate::{Array, Int16Array, Int32Array, StringArray};
+    use crate::{Array, Int32Array, StringArray};
 
     fn build_input_array(size: usize) -> Vec<Option<i32>> {
         // The input array is created by shuffling and repeating
@@ -643,9 +612,10 @@ mod tests {
         ]);
 
         // Construct a run_ends array:
-        let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values([
-            4_i16, 6, 7, 9, 13, 18, 20, 22,
-        ]);
+        let run_ends_values = [4_i16, 6, 7, 9, 13, 18, 20, 22];
+        let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values(
+            run_ends_values.iter().copied(),
+        );
 
         // Construct a run ends encoded array from the above two
         let ree_array =
@@ -659,8 +629,7 @@ mod tests {
         assert_eq!(&DataType::Int8, values.data_type());
 
         let run_ends = ree_array.run_ends();
-        assert_eq!(&run_ends_data.into_data(), run_ends.data());
-        assert_eq!(&DataType::Int16, run_ends.data_type());
+        assert_eq!(run_ends.values(), &run_ends_values);
     }
 
     #[test]
@@ -671,7 +640,7 @@ mod tests {
         builder.append_value(22345678);
         let array = builder.finish();
         assert_eq!(
-            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  1,\n  2,\n  
3,\n], values: PrimitiveArray<UInt32>\n[\n  12345678,\n  null,\n  
22345678,\n]}\n",
+            "RunArray {run_ends: [1, 2, 3], values: 
PrimitiveArray<UInt32>\n[\n  12345678,\n  null,\n  22345678,\n]}\n",
             format!("{array:?}")
         );
 
@@ -685,7 +654,7 @@ mod tests {
         assert_eq!(array.null_count(), 0);
 
         assert_eq!(
-            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  20,\n], values: 
PrimitiveArray<UInt32>\n[\n  1,\n]}\n",
+            "RunArray {run_ends: [20], values: PrimitiveArray<UInt32>\n[\n  
1,\n]}\n",
             format!("{array:?}")
         );
     }
@@ -698,7 +667,7 @@ mod tests {
             .map(|&x| if x == "b" { None } else { Some(x) })
             .collect();
         assert_eq!(
-            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  
4,\n], values: StringArray\n[\n  \"a\",\n  null,\n  \"c\",\n]}\n",
+            "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n 
 null,\n  \"c\",\n]}\n",
             format!("{array:?}")
         );
 
@@ -707,7 +676,7 @@ mod tests {
 
         let array: RunArray<Int16Type> = test.into_iter().collect();
         assert_eq!(
-            "RunArray {run_ends: PrimitiveArray<Int16>\n[\n  2,\n  3,\n  
4,\n], values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
+            "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n 
 \"b\",\n  \"c\",\n]}\n",
             format!("{array:?}")
         );
     }
@@ -721,8 +690,6 @@ mod tests {
         assert_eq!(array.null_count(), 0);
 
         let run_ends = array.run_ends();
-        assert_eq!(&DataType::Int16, run_ends.data_type());
-        assert_eq!(0, run_ends.null_count());
         assert_eq!(&[1, 2, 3, 4], run_ends.values());
     }
 
@@ -735,9 +702,6 @@ mod tests {
         assert_eq!(array.null_count(), 0);
 
         let run_ends = array.run_ends();
-        assert_eq!(&DataType::Int32, run_ends.data_type());
-        assert_eq!(0, run_ends.null_count());
-        assert_eq!(5, run_ends.len());
         assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());
 
         let values_data = array.values();
@@ -754,7 +718,7 @@ mod tests {
         assert_eq!(array.null_count(), 0);
 
         let run_ends = array.run_ends();
-        assert_eq!(1, run_ends.len());
+        assert_eq!(3, run_ends.len());
         assert_eq!(&[3], run_ends.values());
 
         let values_data = array.values();
@@ -770,16 +734,14 @@ mod tests {
             [Some(1), Some(2), Some(3), Some(4)].into_iter().collect();
 
         let array = RunArray::<Int32Type>::try_new(&run_ends, 
&values).unwrap();
-        assert_eq!(array.run_ends().data_type(), &DataType::Int32);
         assert_eq!(array.values().data_type(), &DataType::Utf8);
 
         assert_eq!(array.null_count(), 0);
         assert_eq!(array.len(), 4);
-        assert_eq!(array.run_ends.null_count(), 0);
         assert_eq!(array.values().null_count(), 1);
 
         assert_eq!(
-            "RunArray {run_ends: PrimitiveArray<Int32>\n[\n  1,\n  2,\n  3,\n  
4,\n], values: StringArray\n[\n  \"foo\",\n  \"bar\",\n  null,\n  
\"baz\",\n]}\n",
+            "RunArray {run_ends: [1, 2, 3, 4], values: StringArray\n[\n  
\"foo\",\n  \"bar\",\n  null,\n  \"baz\",\n]}\n",
             format!("{array:?}")
         );
     }
@@ -788,7 +750,7 @@ mod tests {
     fn test_run_array_int16_type_definition() {
         let array: Int16RunArray = vec!["a", "a", "b", "c", 
"c"].into_iter().collect();
         let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", 
"c"]));
-        assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+        assert_eq!(array.run_ends().values(), &[2, 3, 5]);
         assert_eq!(array.values(), &values);
     }
 
@@ -796,7 +758,7 @@ mod tests {
     fn test_run_array_empty_string() {
         let array: Int16RunArray = vec!["a", "a", "", "", 
"c"].into_iter().collect();
         let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "", 
"c"]));
-        assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5]));
+        assert_eq!(array.run_ends().values(), &[2, 4, 5]);
         assert_eq!(array.values(), &values);
     }
 
@@ -849,9 +811,7 @@ mod tests {
     }
 
     #[test]
-    #[should_panic(
-        expected = "PrimitiveArray expected ArrayData with type Int64 got 
Int32"
-    )]
+    #[should_panic(expected = "Incorrect run ends type")]
     fn test_run_array_run_ends_data_type_mismatch() {
         let a = RunArray::<Int32Type>::from_iter(["32"]);
         let _ = RunArray::<Int64Type>::from(a.into_data());
@@ -874,7 +834,7 @@ mod tests {
                 let actual = typed.value(i);
                 assert_eq!(*val, actual)
             } else {
-                let physical_ix = run_array.get_physical_index(i).unwrap();
+                let physical_ix = run_array.get_physical_index(i);
                 assert!(typed.values().is_null(physical_ix));
             };
         }
diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs 
b/arrow-array/src/builder/generic_byte_run_builder.rs
index c6dbb82ff..5c15b1544 100644
--- a/arrow-array/src/builder/generic_byte_run_builder.rs
+++ b/arrow-array/src/builder/generic_byte_run_builder.rs
@@ -49,10 +49,7 @@ use arrow_buffer::ArrowNativeType;
 /// builder.append_null();
 /// let array = builder.finish();
 ///
-/// assert_eq!(
-///     array.run_ends(),
-///     &Int16Array::from(vec![Some(2), Some(3), Some(5), Some(6)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5, 6]);
 ///
 /// let av = array.values();
 ///
@@ -331,10 +328,7 @@ where
 /// builder.extend([Some("def"), Some("def"), Some("abc")]);
 /// let array = builder.finish();
 ///
-/// assert_eq!(
-///   array.run_ends(),
-///   &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
 ///
 /// // Values are polymorphic and so require a downcast.
 /// let av = array.values();
@@ -370,10 +364,7 @@ pub type LargeStringRunBuilder<K> = 
GenericByteRunBuilder<K, LargeUtf8Type>;
 /// builder.extend([Some(b"def"), Some(b"def"), Some(b"abc")]);
 /// let array = builder.finish();
 ///
-/// assert_eq!(
-///   array.run_ends(),
-///   &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
 ///
 /// // Values are polymorphic and so require a downcast.
 /// let av = array.values();
@@ -396,11 +387,9 @@ mod tests {
     use super::*;
 
     use crate::array::Array;
-    use crate::cast::as_primitive_array;
     use crate::cast::as_string_array;
     use crate::types::{Int16Type, Int32Type};
     use crate::GenericByteArray;
-    use crate::Int16Array;
     use crate::Int16RunArray;
 
     fn test_bytes_run_buider<T>(values: Vec<&T::Native>)
@@ -426,10 +415,7 @@ mod tests {
         assert_eq!(array.len(), 11);
         assert_eq!(array.null_count(), 0);
 
-        assert_eq!(
-            array.run_ends(),
-            &Int16Array::from(vec![Some(3), Some(5), Some(7), Some(11)])
-        );
+        assert_eq!(array.run_ends().values(), &[3, 5, 7, 11]);
 
         // Values are polymorphic and so require a downcast.
         let av = array.values();
@@ -475,10 +461,7 @@ mod tests {
         assert_eq!(array.len(), 5);
         assert_eq!(array.null_count(), 0);
 
-        assert_eq!(
-            array.run_ends(),
-            &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
-        );
+        assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
 
         // Values are polymorphic and so require a downcast.
         let av = array.values();
@@ -500,10 +483,7 @@ mod tests {
         assert_eq!(array.len(), 8);
         assert_eq!(array.null_count(), 0);
 
-        assert_eq!(
-            array.run_ends(),
-            &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(7), 
Some(8),])
-        );
+        assert_eq!(array.run_ends().values(), &[1, 2, 4, 7, 8]);
 
         // Values are polymorphic and so require a downcast.
         let av2 = array.values();
@@ -536,10 +516,7 @@ mod tests {
         let array = builder.finish();
 
         assert_eq!(array.len(), 10);
-        assert_eq!(
-            as_primitive_array::<Int32Type>(array.run_ends()).values(),
-            &[3, 5, 8, 10]
-        );
+        assert_eq!(array.run_ends().values(), &[3, 5, 8, 10]);
 
         let str_array = as_string_array(array.values().as_ref());
         assert_eq!(str_array.value(0), "a");
diff --git a/arrow-array/src/builder/primitive_run_builder.rs 
b/arrow-array/src/builder/primitive_run_builder.rs
index 410662283..e7c822ee6 100644
--- a/arrow-array/src/builder/primitive_run_builder.rs
+++ b/arrow-array/src/builder/primitive_run_builder.rs
@@ -44,10 +44,7 @@ use arrow_buffer::ArrowNativeType;
 /// builder.append_value(5678);
 /// let array = builder.finish();
 ///
-/// assert_eq!(
-///     array.run_ends(),
-///     &Int16Array::from(vec![Some(3), Some(4), Some(6)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[3, 4, 6]);
 ///
 /// let av = array.values();
 ///
@@ -270,7 +267,7 @@ mod tests {
     use crate::builder::PrimitiveRunBuilder;
     use crate::cast::as_primitive_array;
     use crate::types::{Int16Type, UInt32Type};
-    use crate::{Array, Int16Array, UInt32Array};
+    use crate::{Array, UInt32Array};
 
     #[test]
     fn test_primitive_ree_array_builder() {
@@ -287,10 +284,7 @@ mod tests {
         assert_eq!(array.null_count(), 0);
         assert_eq!(array.len(), 6);
 
-        assert_eq!(
-            array.run_ends(),
-            &Int16Array::from(vec![Some(3), Some(4), Some(6)])
-        );
+        assert_eq!(array.run_ends().values(), &[3, 4, 6]);
 
         let av = array.values();
 
@@ -313,10 +307,7 @@ mod tests {
 
         assert_eq!(array.len(), 11);
         assert_eq!(array.null_count(), 0);
-        assert_eq!(
-            as_primitive_array::<Int16Type>(array.run_ends()).values(),
-            &[1, 3, 5, 9, 10, 11]
-        );
+        assert_eq!(array.run_ends().values(), &[1, 3, 5, 9, 10, 11]);
         assert_eq!(
             as_primitive_array::<Int16Type>(array.values().as_ref()).values(),
             &[1, 2, 5, 4, 6, 2]
diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs
index 44cb59ac7..60022113c 100644
--- a/arrow-array/src/run_iterator.rs
+++ b/arrow-array/src/run_iterator.rs
@@ -17,9 +17,8 @@
 
 //! Idiomatic iterator for [`RunArray`](crate::Array)
 
-use arrow_buffer::ArrowNativeType;
-
 use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, 
TypedRunArray};
+use arrow_buffer::ArrowNativeType;
 
 /// The [`RunArrayIter`] provides an idiomatic way to iterate over the run 
array.
 /// It returns Some(T) if there is a value or None if the value is null.
@@ -83,14 +82,11 @@ where
         if self.current_front_logical == self.current_back_logical {
             return None;
         }
+
         // If current logical index is greater than current run end index then 
increment
         // the physical index.
-        if self.current_front_logical
-            >= self
-                .array
-                .run_ends()
-                .value(self.current_front_physical)
-                .as_usize()
+        let run_ends = self.array.run_ends().values();
+        if self.current_front_logical >= 
run_ends[self.current_front_physical].as_usize()
         {
             // As the run_ends is expected to be strictly increasing, there
             // should be at least one logical entry in one physical entry. 
Because of this
@@ -138,13 +134,10 @@ where
 
         self.current_back_logical -= 1;
 
+        let run_ends = self.array.run_ends().values();
         if self.current_back_physical > 0
             && self.current_back_logical
-                < self
-                    .array
-                    .run_ends()
-                    .value(self.current_back_physical - 1)
-                    .as_usize()
+                < run_ends[self.current_back_physical - 1].as_usize()
         {
             // As the run_ends is expected to be strictly increasing, there
             // should be at least one logical entry in one physical entry. 
Because of this
diff --git a/arrow-buffer/src/buffer/mod.rs b/arrow-buffer/src/buffer/mod.rs
index f7e41260d..ed53d3361 100644
--- a/arrow-buffer/src/buffer/mod.rs
+++ b/arrow-buffer/src/buffer/mod.rs
@@ -32,3 +32,5 @@ mod boolean;
 pub use boolean::*;
 mod null;
 pub use null::*;
+mod run;
+pub use run::*;
diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs
new file mode 100644
index 000000000..a7c396387
--- /dev/null
+++ b/arrow-buffer/src/buffer/run.rs
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::buffer::ScalarBuffer;
+use crate::ArrowNativeType;
+
+/// A slice-able buffer of monotonically increasing, positive integers used to 
store run-ends
+///
+/// # Logical vs Physical
+///
+/// A [`RunEndBuffer`] is used to encode runs of the same value, the index of 
each run is
+/// called the physical index. The logical index is then the corresponding 
index in the logical
+/// run-encoded array, i.e. a single run of length `3`, would have the logical 
indices `0..3`.
+///
+/// Each value in [`RunEndBuffer::values`] is the cumulative length of all 
runs in the
+/// logical array, up to that physical index.
+///
+/// Consider a [`RunEndBuffer`] containing `[3, 4, 6]`. The maximum physical 
index is `2`,
+/// as there are `3` values, and the maximum logical index is `6`, as the 
maximum run end
+/// is `6`. The physical indices are therefore `[0, 0, 0, 1, 1, 2, 2]`
+///
+/// ```text
+///     ┌─────────┐        ┌─────────┐           ┌─────────┐
+///     │    3    │        │    0    │ ─┬──────▶ │    0    │
+///     ├─────────┤        ├─────────┤  │        ├─────────┤
+///     │    4    │        │    1    │ ─┤ ┌────▶ │    1    │
+///     ├─────────┤        ├─────────┤  │ │      ├─────────┤
+///     │    6    │        │    2    │ ─┘ │ ┌──▶ │    2    │
+///     └─────────┘        ├─────────┤    │ │    └─────────┘
+///      run ends          │    3    │ ───┤ │  physical indices
+///                        ├─────────┤    │ │
+///                        │    4    │ ───┘ │
+///                        ├─────────┤      │
+///                        │    5    │ ─────┤
+///                        ├─────────┤      │
+///                        │    6    │ ─────┘
+///                        └─────────┘
+///                      logical indices
+/// ```
+///
+/// # Slicing
+///
+/// In order to provide zero-copy slicing, this container stores a separate 
offset and length
+///
+/// For example, a [`RunEndBuffer`] containing values `[3, 6, 8]` with offset 
and length `4` would
+/// describe the physical indices `1, 1, 2, 2`
+///
+/// For example, a [`RunEndBuffer`] containing values `[6, 8, 9]` with offset 
`2` and length `5`
+/// would describe the physical indices `0, 0, 0, 0, 1`
+///
+/// [Run-End encoded layout]: 
https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
+#[derive(Debug, Clone)]
+pub struct RunEndBuffer<E: ArrowNativeType> {
+    run_ends: ScalarBuffer<E>,
+    len: usize,
+    offset: usize,
+}
+
+impl<E> RunEndBuffer<E>
+where
+    E: ArrowNativeType,
+{
+    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], an `offset` and 
`len`
+    ///
+    /// # Panics
+    ///
+    /// - `buffer` does not contain strictly increasing values greater than 
zero
+    /// - the last value of `buffer` is less than `offset + len`
+    pub fn new(run_ends: ScalarBuffer<E>, offset: usize, len: usize) -> Self {
+        assert!(
+            run_ends.windows(2).all(|w| w[0] < w[1]),
+            "run-ends not strictly increasing"
+        );
+
+        if len != 0 {
+            assert!(!run_ends.is_empty(), "non-empty slice but empty 
run-ends");
+            let end = E::from_usize(offset.saturating_add(len)).unwrap();
+            assert!(
+                *run_ends.first().unwrap() >= E::usize_as(0),
+                "run-ends not greater than 0"
+            );
+            assert!(
+                *run_ends.last().unwrap() >= end,
+                "slice beyond bounds of run-ends"
+            );
+        }
+
+        Self {
+            run_ends,
+            offset,
+            len,
+        }
+    }
+
+    /// Create a new [`RunEndBuffer`] from an [`ScalarBuffer`], an `offset` 
and `len`
+    ///
+    /// # Safety
+    ///
+    /// - `buffer` must contain strictly increasing values greater than zero
+    /// - The last value of `buffer` must be greater than or equal to `offset 
+ len`
+    pub unsafe fn new_unchecked(
+        run_ends: ScalarBuffer<E>,
+        offset: usize,
+        len: usize,
+    ) -> Self {
+        Self {
+            run_ends,
+            offset,
+            len,
+        }
+    }
+
+    /// Returns the logical offset into the run-ends stored by this buffer
+    #[inline]
+    pub fn offset(&self) -> usize {
+        self.offset
+    }
+
+    /// Returns the logical length of the run-ends stored by this buffer
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    /// Returns true if this buffer is empty
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    /// Returns the values of this [`RunEndBuffer`] not including any offset
+    #[inline]
+    pub fn values(&self) -> &[E] {
+        &self.run_ends
+    }
+
+    /// Returns the maximum run-end encoded in the underlying buffer
+    #[inline]
+    pub fn max_value(&self) -> usize {
+        self.values().last().copied().unwrap_or_default().as_usize()
+    }
+
+    /// Performs a binary search to find the physical index for the given 
logical index
+    ///
+    /// The result is arbitrary if `logical_index >= self.len()`
+    pub fn get_physical_index(&self, logical_index: usize) -> usize {
+        let logical_index = E::usize_as(self.offset + logical_index);
+        let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
+
+        match self.run_ends.binary_search_by(cmp) {
+            Ok(idx) => idx + 1,
+            Err(idx) => idx,
+        }
+    }
+
+    /// Returns the physical index at which the logical array starts
+    pub fn get_start_physical_index(&self) -> usize {
+        if self.offset == 0 {
+            return 0;
+        }
+        // Fallback to binary search
+        self.get_physical_index(0)
+    }
+
+    /// Returns the physical index at which the logical array ends
+    pub fn get_end_physical_index(&self) -> usize {
+        if self.max_value() == self.offset + self.len {
+            return self.values().len() - 1;
+        }
+        // Fallback to binary search
+        self.get_physical_index(self.len - 1)
+    }
+
+    /// Slices this [`RunEndBuffer`] by the provided `offset` and `length`
+    pub fn slice(&self, offset: usize, len: usize) -> Self {
+        assert!(
+            offset.saturating_add(len) <= self.len,
+            "the length + offset of the sliced RunEndBuffer cannot exceed the 
existing length"
+        );
+        Self {
+            run_ends: self.run_ends.clone(),
+            offset: self.offset + offset,
+            len,
+        }
+    }
+}
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 75c48bebc..b57692749 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -580,42 +580,30 @@ pub(crate) fn unslice_run_array(arr: ArrayData) -> 
Result<ArrayData, ArrowError>
 fn into_zero_offset_run_array<R: RunEndIndexType>(
     run_array: RunArray<R>,
 ) -> Result<RunArray<R>, ArrowError> {
-    if run_array.offset() == 0
-        && run_array.len() == RunArray::<R>::logical_len(run_array.run_ends())
-    {
+    let run_ends = run_array.run_ends();
+    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
         return Ok(run_array);
     }
+
     // The physical index of original run_ends array from which the 
`ArrayData`is sliced.
-    let start_physical_index = run_array
-        .get_zero_offset_physical_index(run_array.offset())
-        .unwrap();
+    let start_physical_index = run_ends.get_start_physical_index();
 
-    // The logical length of original run_ends array until which the 
`ArrayData` is sliced.
-    let end_logical_index = run_array.offset() + run_array.len() - 1;
     // The physical index of original run_ends array until which the 
`ArrayData`is sliced.
-    let end_physical_index = run_array
-        .get_zero_offset_physical_index(end_logical_index)
-        .unwrap();
+    let end_physical_index = run_ends.get_end_physical_index();
 
     let physical_length = end_physical_index - start_physical_index + 1;
 
-    // build new run_ends array by subtrating offset from run ends.
+    // build new run_ends array by subtracting offset from run ends.
+    let offset = R::Native::usize_as(run_ends.offset());
     let mut builder = BufferBuilder::<R::Native>::new(physical_length);
-    for ix in start_physical_index..end_physical_index {
-        let run_end_value = unsafe {
-            // Safety:
-            // start_physical_index and end_physical_index are within
-            // run_ends array bounds.
-            run_array.run_ends().value_unchecked(ix).as_usize()
-        };
-        let run_end_value = run_end_value - run_array.offset();
-        builder.append(R::Native::from_usize(run_end_value).unwrap());
+    for run_end_value in 
&run_ends.values()[start_physical_index..end_physical_index] {
+        builder.append(run_end_value.sub_wrapping(offset));
     }
     builder.append(R::Native::from_usize(run_array.len()).unwrap());
     let new_run_ends = unsafe {
         // Safety:
         // The function builds a valid run_ends array and hence need not be 
validated.
-        ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+        ArrayDataBuilder::new(R::DATA_TYPE)
             .len(physical_length)
             .add_buffer(builder.finish())
             .build_unchecked()
diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs
index 230eb9390..0f248ee63 100644
--- a/arrow-ord/src/sort.rs
+++ b/arrow-ord/src/sort.rs
@@ -673,7 +673,7 @@ fn sort_run_downcasted<R: RunEndIndexType>(
     let new_run_ends = unsafe {
         // Safety:
         // The function builds a valid run_ends array and hence need not be 
validated.
-        ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+        ArrayDataBuilder::new(R::DATA_TYPE)
             .len(new_physical_len)
             .add_buffer(new_run_ends_builder.finish())
             .build_unchecked()
@@ -746,7 +746,7 @@ where
 
     let mut remaining_len = output_len;
 
-    let run_ends = run_array.run_ends();
+    let run_ends = run_array.run_ends().values();
 
     assert_eq!(
         0,
@@ -770,22 +770,20 @@ where
             // and len, both of which are within bounds of run_array
             if physical_index == start_physical_index {
                 (
-                    run_ends.value_unchecked(physical_index).as_usize()
+                    run_ends.get_unchecked(physical_index).as_usize()
                         - run_array.offset(),
                     0,
                 )
             } else if physical_index == end_physical_index {
-                let prev_run_end =
-                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                let prev_run_end = run_ends.get_unchecked(physical_index - 
1).as_usize();
                 (
                     run_array.offset() + run_array.len() - prev_run_end,
                     prev_run_end - run_array.offset(),
                 )
             } else {
-                let prev_run_end =
-                    run_ends.value_unchecked(physical_index - 1).as_usize();
+                let prev_run_end = run_ends.get_unchecked(physical_index - 
1).as_usize();
                 (
-                    run_ends.value_unchecked(physical_index).as_usize() - 
prev_run_end,
+                    run_ends.get_unchecked(physical_index).as_usize() - 
prev_run_end,
                     prev_run_end - run_array.offset(),
                 )
             }
diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs
index 58b5c91f1..68b22f6fe 100644
--- a/arrow-select/src/take.rs
+++ b/arrow-select/src/take.rs
@@ -2157,8 +2157,7 @@ mod tests {
         let take_out = take_run(&run_array, &take_indices).unwrap();
 
         assert_eq!(take_out.len(), 7);
-
-        assert_eq!(take_out.run_ends().len(), 5);
+        assert_eq!(take_out.run_ends().len(), 7);
         assert_eq!(take_out.run_ends().values(), &[1_i32, 3, 4, 5, 7]);
 
         let take_out_values = 
as_primitive_array::<Int32Type>(take_out.values());


Reply via email to