This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ffc0a87d fix: Handle sliced array in run array iterator (#3681)
5ffc0a87d is described below

commit 5ffc0a87dd5abb4f7db1172bac6ed93da95827f7
Author: askoa <[email protected]>
AuthorDate: Mon Feb 13 10:51:31 2023 -0500

    fix: Handle sliced array in run array iterator (#3681)
    
    * Handle sliced array in run array iterator
    
    * incorporate PR comments
    
    ---------
    
    Co-authored-by: ask <ask@local>
---
 arrow-array/src/array/run_array.rs |   5 ++
 arrow-array/src/run_iterator.rs    | 113 +++++++++++++++++++++++++++----------
 2 files changed, 88 insertions(+), 30 deletions(-)

diff --git a/arrow-array/src/array/run_array.rs 
b/arrow-array/src/array/run_array.rs
index 33738d649..709933e1b 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -472,6 +472,11 @@ impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
     pub fn values(&self) -> &'a V {
         self.values
     }
+
+    /// Returns the run array of this [`TypedRunArray`]
+    pub fn run_array(&self) -> &'a RunArray<R> {
+        self.run_array
+    }
 }
 
 impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> {
diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs
index 8bad85a9f..a79969c3c 100644
--- a/arrow-array/src/run_iterator.rs
+++ b/arrow-array/src/run_iterator.rs
@@ -42,10 +42,10 @@ where
     <&'a V as ArrayAccessor>::Item: Default,
 {
     array: TypedRunArray<'a, R, V>,
-    current_logical: usize,
-    current_physical: usize,
-    current_end_logical: usize,
-    current_end_physical: usize,
+    current_front_logical: usize,
+    current_front_physical: usize,
+    current_back_logical: usize,
+    current_back_physical: usize,
 }
 
 impl<'a, R, V> RunArrayIter<'a, R, V>
@@ -57,14 +57,19 @@ where
 {
     /// create a new iterator
     pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
-        let logical_len = array.len();
-        let physical_len: usize = array.values().len();
+        let current_front_physical: usize =
+            array.run_array().get_physical_index(0).unwrap();
+        let current_back_physical: usize = array
+            .run_array()
+            .get_physical_index(array.len() - 1)
+            .unwrap()
+            + 1;
         RunArrayIter {
             array,
-            current_logical: 0,
-            current_physical: 0,
-            current_end_logical: logical_len,
-            current_end_physical: physical_len,
+            current_front_logical: array.offset(),
+            current_front_physical,
+            current_back_logical: array.offset() + array.len(),
+            current_back_physical,
         }
     }
 }
@@ -80,35 +85,37 @@ where
 
     #[inline]
     fn next(&mut self) -> Option<Self::Item> {
-        if self.current_logical == self.current_end_logical {
+        if self.current_front_logical == self.current_back_logical {
             return None;
         }
         // If current logical index is greater than current run end index then 
increment
         // the physical index.
-        if self.current_logical
+        if self.current_front_logical
             >= self
                 .array
                 .run_ends()
-                .value(self.current_physical)
+                .value(self.current_front_physical)
                 .as_usize()
         {
             // As the run_ends is expected to be strictly increasing, there
             // should be at least one logical entry in one physical entry. 
Because of this
             // reason the next value can be accessed by incrementing physical 
index once.
-            self.current_physical += 1;
+            self.current_front_physical += 1;
         }
-        if self.array.values().is_null(self.current_physical) {
-            self.current_logical += 1;
+        if self.array.values().is_null(self.current_front_physical) {
+            self.current_front_logical += 1;
             Some(None)
         } else {
-            self.current_logical += 1;
+            self.current_front_logical += 1;
             // Safety:
             // The self.current_physical is kept within bounds of 
self.current_logical.
             // The self.current_logical will not go out of bounds because of 
the check
             // `self.current_logical = self.current_end_logical` above.
             unsafe {
                 Some(Some(
-                    self.array.values().value_unchecked(self.current_physical),
+                    self.array
+                        .values()
+                        .value_unchecked(self.current_front_physical),
                 ))
             }
         }
@@ -116,8 +123,8 @@ where
 
     fn size_hint(&self) -> (usize, Option<usize>) {
         (
-            self.current_end_logical - self.current_logical,
-            Some(self.current_end_logical - self.current_logical),
+            self.current_back_logical - self.current_front_logical,
+            Some(self.current_back_logical - self.current_front_logical),
         )
     }
 }
@@ -130,26 +137,26 @@ where
     <&'a V as ArrayAccessor>::Item: Default,
 {
     fn next_back(&mut self) -> Option<Self::Item> {
-        if self.current_end_logical == self.current_logical {
+        if self.current_back_logical == self.current_front_logical {
             return None;
         }
 
-        self.current_end_logical -= 1;
+        self.current_back_logical -= 1;
 
-        if self.current_end_physical > 0
-            && self.current_end_logical
+        if self.current_back_physical > 0
+            && self.current_back_logical
                 < self
                     .array
                     .run_ends()
-                    .value(self.current_end_physical - 1)
+                    .value(self.current_back_physical - 1)
                     .as_usize()
         {
             // As the run_ends is expected to be strictly increasing, there
             // should be at least one logical entry in one physical entry. 
Because of this
             // reason the next value can be accessed by decrementing physical 
index once.
-            self.current_end_physical -= 1;
+            self.current_back_physical -= 1;
         }
-        Some(if self.array.values().is_null(self.current_end_physical) {
+        Some(if self.array.values().is_null(self.current_back_physical) {
             None
         } else {
             // Safety:
@@ -160,7 +167,7 @@ where
                 Some(
                     self.array
                         .values()
-                        .value_unchecked(self.current_end_physical),
+                        .value_unchecked(self.current_back_physical),
                 )
             }
         })
@@ -184,8 +191,8 @@ mod tests {
     use crate::{
         array::{Int32Array, StringArray},
         builder::PrimitiveRunBuilder,
-        types::Int32Type,
-        Int64RunArray,
+        types::{Int16Type, Int32Type},
+        Array, Int64RunArray, PrimitiveArray, RunArray,
     };
 
     fn build_input_array(size: usize) -> Vec<Option<i32>> {
@@ -345,4 +352,50 @@ mod tests {
 
         assert_eq!(expected_vec, result_asref);
     }
+
+    #[test]
+    fn test_sliced_run_array_iterator() {
+        let total_len = 80;
+        let input_array = build_input_array(total_len);
+
+        // Encode the input_array to run array
+        let mut builder =
+            PrimitiveRunBuilder::<Int16Type, 
Int32Type>::with_capacity(input_array.len());
+        builder.extend(input_array.iter().copied());
+        let run_array = builder.finish();
+
+        // test for all slice lengths.
+        for slice_len in 1..=total_len {
+            // test for offset = 0, slice length = slice_len
+            let sliced_run_array: RunArray<Int16Type> =
+                run_array.slice(0, slice_len).into_data().into();
+            let sliced_typed_run_array = sliced_run_array
+                .downcast::<PrimitiveArray<Int32Type>>()
+                .unwrap();
+
+            // Iterate on sliced typed run array
+            let actual: Vec<Option<i32>> = 
sliced_typed_run_array.into_iter().collect();
+            let expected: Vec<Option<i32>> =
+                input_array.iter().take(slice_len).copied().collect();
+            assert_eq!(expected, actual);
+
+            // test for offset = total_len - slice_len, length = slice_len
+            let sliced_run_array: RunArray<Int16Type> = run_array
+                .slice(total_len - slice_len, slice_len)
+                .into_data()
+                .into();
+            let sliced_typed_run_array = sliced_run_array
+                .downcast::<PrimitiveArray<Int32Type>>()
+                .unwrap();
+
+            // Iterate on sliced typed run array
+            let actual: Vec<Option<i32>> = 
sliced_typed_run_array.into_iter().collect();
+            let expected: Vec<Option<i32>> = input_array
+                .iter()
+                .skip(total_len - slice_len)
+                .copied()
+                .collect();
+            assert_eq!(expected, actual);
+        }
+    }
 }

Reply via email to