This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5ffc0a87d fix: Handle sliced array in run array iterator (#3681)
5ffc0a87d is described below
commit 5ffc0a87dd5abb4f7db1172bac6ed93da95827f7
Author: askoa <[email protected]>
AuthorDate: Mon Feb 13 10:51:31 2023 -0500
fix: Handle sliced array in run array iterator (#3681)
* Handle sliced array in run array iterator
* incorporate PR comments
---------
Co-authored-by: ask <ask@local>
---
arrow-array/src/array/run_array.rs | 5 ++
arrow-array/src/run_iterator.rs | 113 +++++++++++++++++++++++++++----------
2 files changed, 88 insertions(+), 30 deletions(-)
diff --git a/arrow-array/src/array/run_array.rs
b/arrow-array/src/array/run_array.rs
index 33738d649..709933e1b 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -472,6 +472,11 @@ impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
pub fn values(&self) -> &'a V {
self.values
}
+
+ /// Returns the run array of this [`TypedRunArray`]
+ pub fn run_array(&self) -> &'a RunArray<R> {
+ self.run_array
+ }
}
impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> {
diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs
index 8bad85a9f..a79969c3c 100644
--- a/arrow-array/src/run_iterator.rs
+++ b/arrow-array/src/run_iterator.rs
@@ -42,10 +42,10 @@ where
<&'a V as ArrayAccessor>::Item: Default,
{
array: TypedRunArray<'a, R, V>,
- current_logical: usize,
- current_physical: usize,
- current_end_logical: usize,
- current_end_physical: usize,
+ current_front_logical: usize,
+ current_front_physical: usize,
+ current_back_logical: usize,
+ current_back_physical: usize,
}
impl<'a, R, V> RunArrayIter<'a, R, V>
@@ -57,14 +57,19 @@ where
{
/// create a new iterator
pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
- let logical_len = array.len();
- let physical_len: usize = array.values().len();
+ let current_front_physical: usize =
+ array.run_array().get_physical_index(0).unwrap();
+ let current_back_physical: usize = array
+ .run_array()
+ .get_physical_index(array.len() - 1)
+ .unwrap()
+ + 1;
RunArrayIter {
array,
- current_logical: 0,
- current_physical: 0,
- current_end_logical: logical_len,
- current_end_physical: physical_len,
+ current_front_logical: array.offset(),
+ current_front_physical,
+ current_back_logical: array.offset() + array.len(),
+ current_back_physical,
}
}
}
@@ -80,35 +85,37 @@ where
#[inline]
fn next(&mut self) -> Option<Self::Item> {
- if self.current_logical == self.current_end_logical {
+ if self.current_front_logical == self.current_back_logical {
return None;
}
// If current logical index is greater than current run end index then
increment
// the physical index.
- if self.current_logical
+ if self.current_front_logical
>= self
.array
.run_ends()
- .value(self.current_physical)
+ .value(self.current_front_physical)
.as_usize()
{
// As the run_ends is expected to be strictly increasing, there
// should be at least one logical entry in one physical entry.
Because of this
// reason the next value can be accessed by incrementing physical
index once.
- self.current_physical += 1;
+ self.current_front_physical += 1;
}
- if self.array.values().is_null(self.current_physical) {
- self.current_logical += 1;
+ if self.array.values().is_null(self.current_front_physical) {
+ self.current_front_logical += 1;
Some(None)
} else {
- self.current_logical += 1;
+ self.current_front_logical += 1;
// Safety:
// The self.current_physical is kept within bounds of
self.current_logical.
// The self.current_logical will not go out of bounds because of
the check
// `self.current_logical = self.current_end_logical` above.
unsafe {
Some(Some(
- self.array.values().value_unchecked(self.current_physical),
+ self.array
+ .values()
+ .value_unchecked(self.current_front_physical),
))
}
}
@@ -116,8 +123,8 @@ where
fn size_hint(&self) -> (usize, Option<usize>) {
(
- self.current_end_logical - self.current_logical,
- Some(self.current_end_logical - self.current_logical),
+ self.current_back_logical - self.current_front_logical,
+ Some(self.current_back_logical - self.current_front_logical),
)
}
}
@@ -130,26 +137,26 @@ where
<&'a V as ArrayAccessor>::Item: Default,
{
fn next_back(&mut self) -> Option<Self::Item> {
- if self.current_end_logical == self.current_logical {
+ if self.current_back_logical == self.current_front_logical {
return None;
}
- self.current_end_logical -= 1;
+ self.current_back_logical -= 1;
- if self.current_end_physical > 0
- && self.current_end_logical
+ if self.current_back_physical > 0
+ && self.current_back_logical
< self
.array
.run_ends()
- .value(self.current_end_physical - 1)
+ .value(self.current_back_physical - 1)
.as_usize()
{
// As the run_ends is expected to be strictly increasing, there
// should be at least one logical entry in one physical entry.
Because of this
// reason the next value can be accessed by decrementing physical
index once.
- self.current_end_physical -= 1;
+ self.current_back_physical -= 1;
}
- Some(if self.array.values().is_null(self.current_end_physical) {
+ Some(if self.array.values().is_null(self.current_back_physical) {
None
} else {
// Safety:
@@ -160,7 +167,7 @@ where
Some(
self.array
.values()
- .value_unchecked(self.current_end_physical),
+ .value_unchecked(self.current_back_physical),
)
}
})
@@ -184,8 +191,8 @@ mod tests {
use crate::{
array::{Int32Array, StringArray},
builder::PrimitiveRunBuilder,
- types::Int32Type,
- Int64RunArray,
+ types::{Int16Type, Int32Type},
+ Array, Int64RunArray, PrimitiveArray, RunArray,
};
fn build_input_array(size: usize) -> Vec<Option<i32>> {
@@ -345,4 +352,50 @@ mod tests {
assert_eq!(expected_vec, result_asref);
}
+
+ #[test]
+ fn test_sliced_run_array_iterator() {
+ let total_len = 80;
+ let input_array = build_input_array(total_len);
+
+ // Encode the input_array to run array
+ let mut builder =
+ PrimitiveRunBuilder::<Int16Type,
Int32Type>::with_capacity(input_array.len());
+ builder.extend(input_array.iter().copied());
+ let run_array = builder.finish();
+
+ // test for all slice lengths.
+ for slice_len in 1..=total_len {
+ // test for offset = 0, slice length = slice_len
+ let sliced_run_array: RunArray<Int16Type> =
+ run_array.slice(0, slice_len).into_data().into();
+ let sliced_typed_run_array = sliced_run_array
+ .downcast::<PrimitiveArray<Int32Type>>()
+ .unwrap();
+
+ // Iterate on sliced typed run array
+ let actual: Vec<Option<i32>> =
sliced_typed_run_array.into_iter().collect();
+ let expected: Vec<Option<i32>> =
+ input_array.iter().take(slice_len).copied().collect();
+ assert_eq!(expected, actual);
+
+ // test for offset = total_len - slice_len, length = slice_len
+ let sliced_run_array: RunArray<Int16Type> = run_array
+ .slice(total_len - slice_len, slice_len)
+ .into_data()
+ .into();
+ let sliced_typed_run_array = sliced_run_array
+ .downcast::<PrimitiveArray<Int32Type>>()
+ .unwrap();
+
+ // Iterate on sliced typed run array
+ let actual: Vec<Option<i32>> =
sliced_typed_run_array.into_iter().collect();
+ let expected: Vec<Option<i32>> = input_array
+ .iter()
+ .skip(total_len - slice_len)
+ .copied()
+ .collect();
+ assert_eq!(expected, actual);
+ }
+ }
}