This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 36f2db3a3 Add RunEndBuffer (#1799) (#3817)
36f2db3a3 is described below
commit 36f2db3a35e07dfbfdb6b32e457d40ef8ccfb601
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Mar 8 20:17:31 2023 +0100
Add RunEndBuffer (#1799) (#3817)
* Add RunEndBuffer (#1799)
* Fix test
* Revert rename
* Format
* Clippy
* Remove unnecessary check
* Fix
* Tweak docs
* Add docs
---
arrow-array/src/array/mod.rs | 1 +
arrow-array/src/array/run_array.rs | 138 +++++---------
.../src/builder/generic_byte_run_builder.rs | 37 +---
arrow-array/src/builder/primitive_run_builder.rs | 17 +-
arrow-array/src/run_iterator.rs | 19 +-
arrow-buffer/src/buffer/mod.rs | 2 +
arrow-buffer/src/buffer/run.rs | 200 +++++++++++++++++++++
arrow-ipc/src/writer.rs | 32 ++--
arrow-ord/src/sort.rs | 14 +-
arrow-select/src/take.rs | 3 +-
10 files changed, 286 insertions(+), 177 deletions(-)
diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs
index f3c35e51f..dfdaac85b 100644
--- a/arrow-array/src/array/mod.rs
+++ b/arrow-array/src/array/mod.rs
@@ -880,6 +880,7 @@ mod tests {
assert_eq!(array.null_count(), 0);
assert_eq!(array.values().len(), 1);
assert_eq!(array.values().null_count(), 1);
+ assert_eq!(array.run_ends().len(), 4);
assert_eq!(array.run_ends().values(), &[4]);
let idx = array.get_physical_indices(&[0, 1, 2,
3]).unwrap();
diff --git a/arrow-array/src/array/run_array.rs
b/arrow-array/src/array/run_array.rs
index 126aefde9..e50903f30 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -17,6 +17,7 @@
use std::any::Any;
+use arrow_buffer::buffer::RunEndBuffer;
use arrow_buffer::ArrowNativeType;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType, Field};
@@ -62,7 +63,7 @@ use crate::{
pub struct RunArray<R: RunEndIndexType> {
data: ArrayData,
- run_ends: PrimitiveArray<R>,
+ run_ends: RunEndBuffer<R::Native>,
values: ArrayRef,
}
@@ -110,11 +111,8 @@ impl<R: RunEndIndexType> RunArray<R> {
Ok(array_data.into())
}
- /// Returns a reference to run_ends array
- ///
- /// Note: any slicing of this [`RunArray`] array is not applied to the
returned array
- /// and must be handled separately
- pub fn run_ends(&self) -> &PrimitiveArray<R> {
+ /// Returns a reference to [`RunEndBuffer`]
+ pub fn run_ends(&self) -> &RunEndBuffer<R::Native> {
&self.run_ends
}
@@ -128,19 +126,12 @@ impl<R: RunEndIndexType> RunArray<R> {
/// Returns the physical index at which the array slice starts.
pub fn get_start_physical_index(&self) -> usize {
- if self.offset() == 0 {
- return 0;
- }
- self.get_zero_offset_physical_index(self.offset()).unwrap()
+ self.run_ends.get_start_physical_index()
}
/// Returns the physical index at which the array slice ends.
pub fn get_end_physical_index(&self) -> usize {
- if self.offset() + self.len() == Self::logical_len(&self.run_ends) {
- return self.run_ends.len() - 1;
- }
- self.get_zero_offset_physical_index(self.offset() + self.len() - 1)
- .unwrap()
+ self.run_ends.get_end_physical_index()
}
/// Downcast this [`RunArray`] to a [`TypedRunArray`]
@@ -164,47 +155,13 @@ impl<R: RunEndIndexType> RunArray<R> {
})
}
- /// Returns index to the physical array for the given index to the logical
array.
- /// The function does not adjust the input logical index based on
`ArrayData::offset`.
- /// Performs a binary search on the run_ends array for the input index.
- #[inline]
- pub fn get_zero_offset_physical_index(&self, logical_index: usize) ->
Option<usize> {
- if logical_index >= Self::logical_len(&self.run_ends) {
- return None;
- }
- let mut st: usize = 0;
- let mut en: usize = self.run_ends.len();
- while st + 1 < en {
- let mid: usize = (st + en) / 2;
- if logical_index
- < unsafe {
- // Safety:
- // The value of mid will always be between 1 and len - 1,
- // where len is length of run ends array.
- // This is based on the fact that `st` starts with 0 and
- // `en` starts with len. The condition `st + 1 < en`
ensures
- // `st` and `en` differs atleast by two. So the value of
`mid`
- // will never be either `st` or `en`
- self.run_ends.value_unchecked(mid - 1).as_usize()
- }
- {
- en = mid
- } else {
- st = mid
- }
- }
- Some(st)
- }
-
/// Returns index to the physical array for the given index to the logical
array.
/// This function adjusts the input logical index based on
`ArrayData::offset`
/// Performs a binary search on the run_ends array for the input index.
- #[inline]
- pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
- if logical_index >= self.len() {
- return None;
- }
- self.get_zero_offset_physical_index(logical_index + self.offset())
+ ///
+ /// The result is arbitrary if `logical_index >= self.len()`
+ pub fn get_physical_index(&self, logical_index: usize) -> usize {
+ self.run_ends.get_physical_index(logical_index)
}
/// Returns the physical indices of the input logical indices. Returns
error if any of the logical
@@ -222,6 +179,9 @@ impl<R: RunEndIndexType> RunArray<R> {
where
I: ArrowNativeType,
{
+ let len = self.run_ends().len();
+ let offset = self.run_ends().offset();
+
let indices_len = logical_indices.len();
if indices_len == 0 {
@@ -243,7 +203,7 @@ impl<R: RunEndIndexType> RunArray<R> {
// Return early if all the logical indices cannot be converted to
physical indices.
let largest_logical_index =
logical_indices[*ordered_indices.last().unwrap()].as_usize();
- if largest_logical_index >= self.len() {
+ if largest_logical_index >= len {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The
logical index cannot be converted is {largest_logical_index}.",
)));
@@ -259,7 +219,7 @@ impl<R: RunEndIndexType> RunArray<R> {
self.run_ends.values().iter().enumerate().skip(skip_value)
{
// Get the run end index (relative to offset) of current physical
index
- let run_end_value = run_end.as_usize() - self.offset();
+ let run_end_value = run_end.as_usize() - offset;
// All the `logical_indices` that are less than current run end
index
// belongs to current physical index.
@@ -295,7 +255,15 @@ impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
}
}
- let run_ends = PrimitiveArray::<R>::from(data.child_data()[0].clone());
+ // Safety
+ // ArrayData is valid
+ let child = &data.child_data()[0];
+ assert_eq!(child.data_type(), &R::DATA_TYPE, "Incorrect run ends
type");
+ let run_ends = unsafe {
+ let scalar = child.buffers()[0].clone().into();
+ RunEndBuffer::new_unchecked(scalar, data.offset(), data.len())
+ };
+
let values = make_array(data.child_data()[1].clone());
Self {
data,
@@ -330,7 +298,8 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
writeln!(
f,
"RunArray {{run_ends: {:?}, values: {:?}}}",
- self.run_ends, self.values
+ self.run_ends.values(),
+ self.values
)
}
}
@@ -347,7 +316,7 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
/// .map(|&x| if x == "b" { None } else { Some(x) })
/// .collect();
/// assert_eq!(
-/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 5,\n],
values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
+/// "RunArray {run_ends: [2, 3, 5], values: StringArray\n[\n \"a\",\n
null,\n \"c\",\n]}\n",
/// format!("{:?}", array)
/// );
/// ```
@@ -374,7 +343,7 @@ impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>>
for RunArray<T> {
/// let test = vec!["a", "a", "b", "c"];
/// let array: RunArray<Int16Type> = test.into_iter().collect();
/// assert_eq!(
-/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n],
values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
+/// "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n
\"b\",\n \"c\",\n]}\n",
/// format!("{:?}", array)
/// );
/// ```
@@ -401,7 +370,7 @@ impl<'a, T: RunEndIndexType> FromIterator<&'a str> for
RunArray<T> {
///
/// let array: Int16RunArray = vec!["a", "a", "b", "c",
"c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b",
"c"]));
-/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int16RunArray = RunArray<Int16Type>;
@@ -416,7 +385,7 @@ pub type Int16RunArray = RunArray<Int16Type>;
///
/// let array: Int32RunArray = vec!["a", "a", "b", "c",
"c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b",
"c"]));
-/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int32RunArray = RunArray<Int32Type>;
@@ -431,7 +400,7 @@ pub type Int32RunArray = RunArray<Int32Type>;
///
/// let array: Int64RunArray = vec!["a", "a", "b", "c",
"c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b",
"c"]));
-/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5]));
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int64RunArray = RunArray<Int64Type>;
@@ -480,7 +449,7 @@ impl<'a, R: RunEndIndexType, V> std::fmt::Debug for
TypedRunArray<'a, R, V> {
impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
/// Returns the run_ends of this [`TypedRunArray`]
- pub fn run_ends(&self) -> &'a PrimitiveArray<R> {
+ pub fn run_ends(&self) -> &'a RunEndBuffer<R::Native> {
self.run_array.run_ends()
}
@@ -531,7 +500,7 @@ where
}
unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item {
- let physical_index =
self.run_array.get_physical_index(logical_index).unwrap();
+ let physical_index = self.run_array.get_physical_index(logical_index);
self.values().value_unchecked(physical_index)
}
}
@@ -563,7 +532,7 @@ mod tests {
use crate::builder::PrimitiveRunBuilder;
use crate::cast::as_primitive_array;
use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
- use crate::{Array, Int16Array, Int32Array, StringArray};
+ use crate::{Array, Int32Array, StringArray};
fn build_input_array(size: usize) -> Vec<Option<i32>> {
// The input array is created by shuffling and repeating
@@ -643,9 +612,10 @@ mod tests {
]);
// Construct a run_ends array:
- let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values([
- 4_i16, 6, 7, 9, 13, 18, 20, 22,
- ]);
+ let run_ends_values = [4_i16, 6, 7, 9, 13, 18, 20, 22];
+ let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values(
+ run_ends_values.iter().copied(),
+ );
// Construct a run ends encoded array from the above two
let ree_array =
@@ -659,8 +629,7 @@ mod tests {
assert_eq!(&DataType::Int8, values.data_type());
let run_ends = ree_array.run_ends();
- assert_eq!(&run_ends_data.into_data(), run_ends.data());
- assert_eq!(&DataType::Int16, run_ends.data_type());
+ assert_eq!(run_ends.values(), &run_ends_values);
}
#[test]
@@ -671,7 +640,7 @@ mod tests {
builder.append_value(22345678);
let array = builder.finish();
assert_eq!(
- "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 1,\n 2,\n
3,\n], values: PrimitiveArray<UInt32>\n[\n 12345678,\n null,\n
22345678,\n]}\n",
+ "RunArray {run_ends: [1, 2, 3], values:
PrimitiveArray<UInt32>\n[\n 12345678,\n null,\n 22345678,\n]}\n",
format!("{array:?}")
);
@@ -685,7 +654,7 @@ mod tests {
assert_eq!(array.null_count(), 0);
assert_eq!(
- "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 20,\n], values:
PrimitiveArray<UInt32>\n[\n 1,\n]}\n",
+ "RunArray {run_ends: [20], values: PrimitiveArray<UInt32>\n[\n
1,\n]}\n",
format!("{array:?}")
);
}
@@ -698,7 +667,7 @@ mod tests {
.map(|&x| if x == "b" { None } else { Some(x) })
.collect();
assert_eq!(
- "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n
4,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
+ "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n
null,\n \"c\",\n]}\n",
format!("{array:?}")
);
@@ -707,7 +676,7 @@ mod tests {
let array: RunArray<Int16Type> = test.into_iter().collect();
assert_eq!(
- "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n
4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
+ "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n
\"b\",\n \"c\",\n]}\n",
format!("{array:?}")
);
}
@@ -721,8 +690,6 @@ mod tests {
assert_eq!(array.null_count(), 0);
let run_ends = array.run_ends();
- assert_eq!(&DataType::Int16, run_ends.data_type());
- assert_eq!(0, run_ends.null_count());
assert_eq!(&[1, 2, 3, 4], run_ends.values());
}
@@ -735,9 +702,6 @@ mod tests {
assert_eq!(array.null_count(), 0);
let run_ends = array.run_ends();
- assert_eq!(&DataType::Int32, run_ends.data_type());
- assert_eq!(0, run_ends.null_count());
- assert_eq!(5, run_ends.len());
assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());
let values_data = array.values();
@@ -754,7 +718,7 @@ mod tests {
assert_eq!(array.null_count(), 0);
let run_ends = array.run_ends();
- assert_eq!(1, run_ends.len());
+ assert_eq!(3, run_ends.len());
assert_eq!(&[3], run_ends.values());
let values_data = array.values();
@@ -770,16 +734,14 @@ mod tests {
[Some(1), Some(2), Some(3), Some(4)].into_iter().collect();
let array = RunArray::<Int32Type>::try_new(&run_ends,
&values).unwrap();
- assert_eq!(array.run_ends().data_type(), &DataType::Int32);
assert_eq!(array.values().data_type(), &DataType::Utf8);
assert_eq!(array.null_count(), 0);
assert_eq!(array.len(), 4);
- assert_eq!(array.run_ends.null_count(), 0);
assert_eq!(array.values().null_count(), 1);
assert_eq!(
- "RunArray {run_ends: PrimitiveArray<Int32>\n[\n 1,\n 2,\n 3,\n
4,\n], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n
\"baz\",\n]}\n",
+ "RunArray {run_ends: [1, 2, 3, 4], values: StringArray\n[\n
\"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n",
format!("{array:?}")
);
}
@@ -788,7 +750,7 @@ mod tests {
fn test_run_array_int16_type_definition() {
let array: Int16RunArray = vec!["a", "a", "b", "c",
"c"].into_iter().collect();
let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b",
"c"]));
- assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
+ assert_eq!(array.run_ends().values(), &[2, 3, 5]);
assert_eq!(array.values(), &values);
}
@@ -796,7 +758,7 @@ mod tests {
fn test_run_array_empty_string() {
let array: Int16RunArray = vec!["a", "a", "", "",
"c"].into_iter().collect();
let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "",
"c"]));
- assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5]));
+ assert_eq!(array.run_ends().values(), &[2, 4, 5]);
assert_eq!(array.values(), &values);
}
@@ -849,9 +811,7 @@ mod tests {
}
#[test]
- #[should_panic(
- expected = "PrimitiveArray expected ArrayData with type Int64 got
Int32"
- )]
+ #[should_panic(expected = "Incorrect run ends type")]
fn test_run_array_run_ends_data_type_mismatch() {
let a = RunArray::<Int32Type>::from_iter(["32"]);
let _ = RunArray::<Int64Type>::from(a.into_data());
@@ -874,7 +834,7 @@ mod tests {
let actual = typed.value(i);
assert_eq!(*val, actual)
} else {
- let physical_ix = run_array.get_physical_index(i).unwrap();
+ let physical_ix = run_array.get_physical_index(i);
assert!(typed.values().is_null(physical_ix));
};
}
diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs
b/arrow-array/src/builder/generic_byte_run_builder.rs
index c6dbb82ff..5c15b1544 100644
--- a/arrow-array/src/builder/generic_byte_run_builder.rs
+++ b/arrow-array/src/builder/generic_byte_run_builder.rs
@@ -49,10 +49,7 @@ use arrow_buffer::ArrowNativeType;
/// builder.append_null();
/// let array = builder.finish();
///
-/// assert_eq!(
-/// array.run_ends(),
-/// &Int16Array::from(vec![Some(2), Some(3), Some(5), Some(6)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[2, 3, 5, 6]);
///
/// let av = array.values();
///
@@ -331,10 +328,7 @@ where
/// builder.extend([Some("def"), Some("def"), Some("abc")]);
/// let array = builder.finish();
///
-/// assert_eq!(
-/// array.run_ends(),
-/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
///
/// // Values are polymorphic and so require a downcast.
/// let av = array.values();
@@ -370,10 +364,7 @@ pub type LargeStringRunBuilder<K> =
GenericByteRunBuilder<K, LargeUtf8Type>;
/// builder.extend([Some(b"def"), Some(b"def"), Some(b"abc")]);
/// let array = builder.finish();
///
-/// assert_eq!(
-/// array.run_ends(),
-/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
///
/// // Values are polymorphic and so require a downcast.
/// let av = array.values();
@@ -396,11 +387,9 @@ mod tests {
use super::*;
use crate::array::Array;
- use crate::cast::as_primitive_array;
use crate::cast::as_string_array;
use crate::types::{Int16Type, Int32Type};
use crate::GenericByteArray;
- use crate::Int16Array;
use crate::Int16RunArray;
fn test_bytes_run_buider<T>(values: Vec<&T::Native>)
@@ -426,10 +415,7 @@ mod tests {
assert_eq!(array.len(), 11);
assert_eq!(array.null_count(), 0);
- assert_eq!(
- array.run_ends(),
- &Int16Array::from(vec![Some(3), Some(5), Some(7), Some(11)])
- );
+ assert_eq!(array.run_ends().values(), &[3, 5, 7, 11]);
// Values are polymorphic and so require a downcast.
let av = array.values();
@@ -475,10 +461,7 @@ mod tests {
assert_eq!(array.len(), 5);
assert_eq!(array.null_count(), 0);
- assert_eq!(
- array.run_ends(),
- &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)])
- );
+ assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
// Values are polymorphic and so require a downcast.
let av = array.values();
@@ -500,10 +483,7 @@ mod tests {
assert_eq!(array.len(), 8);
assert_eq!(array.null_count(), 0);
- assert_eq!(
- array.run_ends(),
- &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(7),
Some(8),])
- );
+ assert_eq!(array.run_ends().values(), &[1, 2, 4, 7, 8]);
// Values are polymorphic and so require a downcast.
let av2 = array.values();
@@ -536,10 +516,7 @@ mod tests {
let array = builder.finish();
assert_eq!(array.len(), 10);
- assert_eq!(
- as_primitive_array::<Int32Type>(array.run_ends()).values(),
- &[3, 5, 8, 10]
- );
+ assert_eq!(array.run_ends().values(), &[3, 5, 8, 10]);
let str_array = as_string_array(array.values().as_ref());
assert_eq!(str_array.value(0), "a");
diff --git a/arrow-array/src/builder/primitive_run_builder.rs
b/arrow-array/src/builder/primitive_run_builder.rs
index 410662283..e7c822ee6 100644
--- a/arrow-array/src/builder/primitive_run_builder.rs
+++ b/arrow-array/src/builder/primitive_run_builder.rs
@@ -44,10 +44,7 @@ use arrow_buffer::ArrowNativeType;
/// builder.append_value(5678);
/// let array = builder.finish();
///
-/// assert_eq!(
-/// array.run_ends(),
-/// &Int16Array::from(vec![Some(3), Some(4), Some(6)])
-/// );
+/// assert_eq!(array.run_ends().values(), &[3, 4, 6]);
///
/// let av = array.values();
///
@@ -270,7 +267,7 @@ mod tests {
use crate::builder::PrimitiveRunBuilder;
use crate::cast::as_primitive_array;
use crate::types::{Int16Type, UInt32Type};
- use crate::{Array, Int16Array, UInt32Array};
+ use crate::{Array, UInt32Array};
#[test]
fn test_primitive_ree_array_builder() {
@@ -287,10 +284,7 @@ mod tests {
assert_eq!(array.null_count(), 0);
assert_eq!(array.len(), 6);
- assert_eq!(
- array.run_ends(),
- &Int16Array::from(vec![Some(3), Some(4), Some(6)])
- );
+ assert_eq!(array.run_ends().values(), &[3, 4, 6]);
let av = array.values();
@@ -313,10 +307,7 @@ mod tests {
assert_eq!(array.len(), 11);
assert_eq!(array.null_count(), 0);
- assert_eq!(
- as_primitive_array::<Int16Type>(array.run_ends()).values(),
- &[1, 3, 5, 9, 10, 11]
- );
+ assert_eq!(array.run_ends().values(), &[1, 3, 5, 9, 10, 11]);
assert_eq!(
as_primitive_array::<Int16Type>(array.values().as_ref()).values(),
&[1, 2, 5, 4, 6, 2]
diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs
index 44cb59ac7..60022113c 100644
--- a/arrow-array/src/run_iterator.rs
+++ b/arrow-array/src/run_iterator.rs
@@ -17,9 +17,8 @@
//! Idiomatic iterator for [`RunArray`](crate::Array)
-use arrow_buffer::ArrowNativeType;
-
use crate::{array::ArrayAccessor, types::RunEndIndexType, Array,
TypedRunArray};
+use arrow_buffer::ArrowNativeType;
/// The [`RunArrayIter`] provides an idiomatic way to iterate over the run
array.
/// It returns Some(T) if there is a value or None if the value is null.
@@ -83,14 +82,11 @@ where
if self.current_front_logical == self.current_back_logical {
return None;
}
+
// If current logical index is greater than current run end index then
increment
// the physical index.
- if self.current_front_logical
- >= self
- .array
- .run_ends()
- .value(self.current_front_physical)
- .as_usize()
+ let run_ends = self.array.run_ends().values();
+ if self.current_front_logical >=
run_ends[self.current_front_physical].as_usize()
{
// As the run_ends is expected to be strictly increasing, there
// should be at least one logical entry in one physical entry.
Because of this
@@ -138,13 +134,10 @@ where
self.current_back_logical -= 1;
+ let run_ends = self.array.run_ends().values();
if self.current_back_physical > 0
&& self.current_back_logical
- < self
- .array
- .run_ends()
- .value(self.current_back_physical - 1)
- .as_usize()
+ < run_ends[self.current_back_physical - 1].as_usize()
{
// As the run_ends is expected to be strictly increasing, there
// should be at least one logical entry in one physical entry.
Because of this
diff --git a/arrow-buffer/src/buffer/mod.rs b/arrow-buffer/src/buffer/mod.rs
index f7e41260d..ed53d3361 100644
--- a/arrow-buffer/src/buffer/mod.rs
+++ b/arrow-buffer/src/buffer/mod.rs
@@ -32,3 +32,5 @@ mod boolean;
pub use boolean::*;
mod null;
pub use null::*;
+mod run;
+pub use run::*;
diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs
new file mode 100644
index 000000000..a7c396387
--- /dev/null
+++ b/arrow-buffer/src/buffer/run.rs
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::buffer::ScalarBuffer;
+use crate::ArrowNativeType;
+
+/// A slice-able buffer of monotonically increasing, positive integers used to
store run-ends
+///
+/// # Logical vs Physical
+///
+/// A [`RunEndBuffer`] is used to encode runs of the same value, the index of
each run is
+/// called the physical index. The logical index is then the corresponding
index in the logical
+/// run-encoded array, i.e. a single run of length `3`, would have the logical
indices `0..3`.
+///
+/// Each value in [`RunEndBuffer::values`] is the cumulative length of all
runs in the
+/// logical array, up to that physical index.
+///
+/// Consider a [`RunEndBuffer`] containing `[3, 4, 6]`. The maximum physical
index is `2`,
+/// as there are `3` values, and the maximum logical index is `6`, as the
maximum run end
+/// is `6`. The physical indices are therefore `[0, 0, 0, 1, 1, 2, 2]`
+///
+/// ```text
+/// ┌─────────┐ ┌─────────┐ ┌─────────┐
+/// │ 3 │ │ 0 │ ─┬──────▶ │ 0 │
+/// ├─────────┤ ├─────────┤ │ ├─────────┤
+/// │ 4 │ │ 1 │ ─┤ ┌────▶ │ 1 │
+/// ├─────────┤ ├─────────┤ │ │ ├─────────┤
+/// │ 6 │ │ 2 │ ─┘ │ ┌──▶ │ 2 │
+/// └─────────┘ ├─────────┤ │ │ └─────────┘
+/// run ends │ 3 │ ───┤ │ physical indices
+/// ├─────────┤ │ │
+/// │ 4 │ ───┘ │
+/// ├─────────┤ │
+/// │ 5 │ ─────┤
+/// ├─────────┤ │
+/// │ 6 │ ─────┘
+/// └─────────┘
+/// logical indices
+/// ```
+///
+/// # Slicing
+///
+/// In order to provide zero-copy slicing, this container stores a separate
offset and length
+///
+/// For example, a [`RunEndBuffer`] containing values `[3, 6, 8]` with offset
and length `4` would
+/// describe the physical indices `1, 1, 2, 2`
+///
+/// For example, a [`RunEndBuffer`] containing values `[6, 8, 9]` with offset
`2` and length `5`
+/// would describe the physical indices `0, 0, 0, 0, 1`
+///
+/// [Run-End encoded layout]:
https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
+#[derive(Debug, Clone)]
+pub struct RunEndBuffer<E: ArrowNativeType> {
+ run_ends: ScalarBuffer<E>,
+ len: usize,
+ offset: usize,
+}
+
+impl<E> RunEndBuffer<E>
+where
+ E: ArrowNativeType,
+{
+ /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], an `offset` and
`len`
+ ///
+ /// # Panics
+ ///
+ /// - `buffer` does not contain strictly increasing values greater than
zero
+ /// - the last value of `buffer` is less than `offset + len`
+ pub fn new(run_ends: ScalarBuffer<E>, offset: usize, len: usize) -> Self {
+ assert!(
+ run_ends.windows(2).all(|w| w[0] < w[1]),
+ "run-ends not strictly increasing"
+ );
+
+ if len != 0 {
+ assert!(!run_ends.is_empty(), "non-empty slice but empty
run-ends");
+ let end = E::from_usize(offset.saturating_add(len)).unwrap();
+ assert!(
+ *run_ends.first().unwrap() >= E::usize_as(0),
+ "run-ends not greater than 0"
+ );
+ assert!(
+ *run_ends.last().unwrap() >= end,
+ "slice beyond bounds of run-ends"
+ );
+ }
+
+ Self {
+ run_ends,
+ offset,
+ len,
+ }
+ }
+
+ /// Create a new [`RunEndBuffer`] from an [`ScalarBuffer`], an `offset`
and `len`
+ ///
+ /// # Safety
+ ///
+ /// - `buffer` must contain strictly increasing values greater than zero
+ /// - The last value of `buffer` must be greater than or equal to `offset
+ len`
+ pub unsafe fn new_unchecked(
+ run_ends: ScalarBuffer<E>,
+ offset: usize,
+ len: usize,
+ ) -> Self {
+ Self {
+ run_ends,
+ offset,
+ len,
+ }
+ }
+
+ /// Returns the logical offset into the run-ends stored by this buffer
+ #[inline]
+ pub fn offset(&self) -> usize {
+ self.offset
+ }
+
+ /// Returns the logical length of the run-ends stored by this buffer
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.len
+ }
+
+ /// Returns true if this buffer is empty
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.len == 0
+ }
+
+ /// Returns the values of this [`RunEndBuffer`] not including any offset
+ #[inline]
+ pub fn values(&self) -> &[E] {
+ &self.run_ends
+ }
+
+ /// Returns the maximum run-end encoded in the underlying buffer
+ #[inline]
+ pub fn max_value(&self) -> usize {
+ self.values().last().copied().unwrap_or_default().as_usize()
+ }
+
+ /// Performs a binary search to find the physical index for the given
logical index
+ ///
+ /// The result is arbitrary if `logical_index >= self.len()`
+ pub fn get_physical_index(&self, logical_index: usize) -> usize {
+ let logical_index = E::usize_as(self.offset + logical_index);
+ let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
+
+ match self.run_ends.binary_search_by(cmp) {
+ Ok(idx) => idx + 1,
+ Err(idx) => idx,
+ }
+ }
+
+ /// Returns the physical index at which the logical array starts
+ pub fn get_start_physical_index(&self) -> usize {
+ if self.offset == 0 {
+ return 0;
+ }
+ // Fallback to binary search
+ self.get_physical_index(0)
+ }
+
+ /// Returns the physical index at which the logical array ends
+ pub fn get_end_physical_index(&self) -> usize {
+ if self.max_value() == self.offset + self.len {
+ return self.values().len() - 1;
+ }
+ // Fallback to binary search
+ self.get_physical_index(self.len - 1)
+ }
+
+ /// Slices this [`RunEndBuffer`] by the provided `offset` and `length`
+ pub fn slice(&self, offset: usize, len: usize) -> Self {
+ assert!(
+ offset.saturating_add(len) <= self.len,
+ "the length + offset of the sliced RunEndBuffer cannot exceed the
existing length"
+ );
+ Self {
+ run_ends: self.run_ends.clone(),
+ offset: self.offset + offset,
+ len,
+ }
+ }
+}
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 75c48bebc..b57692749 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -580,42 +580,30 @@ pub(crate) fn unslice_run_array(arr: ArrayData) ->
Result<ArrayData, ArrowError>
fn into_zero_offset_run_array<R: RunEndIndexType>(
run_array: RunArray<R>,
) -> Result<RunArray<R>, ArrowError> {
- if run_array.offset() == 0
- && run_array.len() == RunArray::<R>::logical_len(run_array.run_ends())
- {
+ let run_ends = run_array.run_ends();
+ if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
return Ok(run_array);
}
+
// The physical index of original run_ends array from which the
`ArrayData`is sliced.
- let start_physical_index = run_array
- .get_zero_offset_physical_index(run_array.offset())
- .unwrap();
+ let start_physical_index = run_ends.get_start_physical_index();
- // The logical length of original run_ends array until which the
`ArrayData` is sliced.
- let end_logical_index = run_array.offset() + run_array.len() - 1;
// The physical index of original run_ends array until which the
`ArrayData`is sliced.
- let end_physical_index = run_array
- .get_zero_offset_physical_index(end_logical_index)
- .unwrap();
+ let end_physical_index = run_ends.get_end_physical_index();
let physical_length = end_physical_index - start_physical_index + 1;
- // build new run_ends array by subtrating offset from run ends.
+ // build new run_ends array by subtracting offset from run ends.
+ let offset = R::Native::usize_as(run_ends.offset());
let mut builder = BufferBuilder::<R::Native>::new(physical_length);
- for ix in start_physical_index..end_physical_index {
- let run_end_value = unsafe {
- // Safety:
- // start_physical_index and end_physical_index are within
- // run_ends array bounds.
- run_array.run_ends().value_unchecked(ix).as_usize()
- };
- let run_end_value = run_end_value - run_array.offset();
- builder.append(R::Native::from_usize(run_end_value).unwrap());
+ for run_end_value in
&run_ends.values()[start_physical_index..end_physical_index] {
+ builder.append(run_end_value.sub_wrapping(offset));
}
builder.append(R::Native::from_usize(run_array.len()).unwrap());
let new_run_ends = unsafe {
// Safety:
// The function builds a valid run_ends array and hence need not be
validated.
- ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+ ArrayDataBuilder::new(R::DATA_TYPE)
.len(physical_length)
.add_buffer(builder.finish())
.build_unchecked()
diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs
index 230eb9390..0f248ee63 100644
--- a/arrow-ord/src/sort.rs
+++ b/arrow-ord/src/sort.rs
@@ -673,7 +673,7 @@ fn sort_run_downcasted<R: RunEndIndexType>(
let new_run_ends = unsafe {
// Safety:
// The function builds a valid run_ends array and hence need not be
validated.
- ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+ ArrayDataBuilder::new(R::DATA_TYPE)
.len(new_physical_len)
.add_buffer(new_run_ends_builder.finish())
.build_unchecked()
@@ -746,7 +746,7 @@ where
let mut remaining_len = output_len;
- let run_ends = run_array.run_ends();
+ let run_ends = run_array.run_ends().values();
assert_eq!(
0,
@@ -770,22 +770,20 @@ where
// and len, both of which are within bounds of run_array
if physical_index == start_physical_index {
(
- run_ends.value_unchecked(physical_index).as_usize()
+ run_ends.get_unchecked(physical_index).as_usize()
- run_array.offset(),
0,
)
} else if physical_index == end_physical_index {
- let prev_run_end =
- run_ends.value_unchecked(physical_index - 1).as_usize();
+ let prev_run_end = run_ends.get_unchecked(physical_index -
1).as_usize();
(
run_array.offset() + run_array.len() - prev_run_end,
prev_run_end - run_array.offset(),
)
} else {
- let prev_run_end =
- run_ends.value_unchecked(physical_index - 1).as_usize();
+ let prev_run_end = run_ends.get_unchecked(physical_index -
1).as_usize();
(
- run_ends.value_unchecked(physical_index).as_usize() -
prev_run_end,
+ run_ends.get_unchecked(physical_index).as_usize() -
prev_run_end,
prev_run_end - run_array.offset(),
)
}
diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs
index 58b5c91f1..68b22f6fe 100644
--- a/arrow-select/src/take.rs
+++ b/arrow-select/src/take.rs
@@ -2157,8 +2157,7 @@ mod tests {
let take_out = take_run(&run_array, &take_indices).unwrap();
assert_eq!(take_out.len(), 7);
-
- assert_eq!(take_out.run_ends().len(), 5);
+ assert_eq!(take_out.run_ends().len(), 7);
assert_eq!(take_out.run_ends().values(), &[1_i32, 3, 4, 5, 7]);
let take_out_values =
as_primitive_array::<Int32Type>(take_out.values());