Jefffrey commented on code in PR #9213:
URL: https://github.com/apache/arrow-rs/pull/9213#discussion_r2726355737


##########
arrow-data/src/equal/run.rs:
##########
@@ -16,71 +16,149 @@
 // under the License.
 
 use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_buffer::RunEndBuffer;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
 
 use super::equal_range;
 
-/// The current implementation of comparison of run array support physical 
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`, 
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index 
cannot be done
-/// in constant time. The current comparison compares the underlying physical 
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
 pub(super) fn run_equal(
     lhs: &ArrayData,
     rhs: &ArrayData,
     lhs_start: usize,
     rhs_start: usize,
     len: usize,
 ) -> bool {
-    if lhs_start != 0
-        || rhs_start != 0
-        || (lhs.len() != len && rhs.len() != len)
-        || lhs.offset() > 0
-        || rhs.offset() > 0
-    {
-        unimplemented!("Logical comparison for run array not supported.")
+    let lhs_index_type = match lhs.data_type() {
+        DataType::RunEndEncoded(f, _) => f.data_type(),
+        _ => unreachable!(),
+    };
+
+    match lhs_index_type {
+        DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start, 
rhs_start, len),
+        DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start, 
rhs_start, len),
+        DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start, 
rhs_start, len),
+        _ => unreachable!(),
     }
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+    run_ends: RunEndBuffer<T>,
+    values: &'a ArrayData,
+}
+
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+    fn new(data: &'a ArrayData, start: usize, len: usize) -> Self {
+        debug_assert!(
+            data.child_data().len() == 2,
+            "RunEndEncoded arrays are guaranteed to have 2 children [run_ends, 
values]"
+        );
+        let run_ends_data = &data.child_data()[0];
+        let raw_run_ends_buffer = &run_ends_data.buffers()[0];
+        // SAFETY: we're reconstructing RunEndBuffer from a known valid 
RunArray
+        let run_ends = unsafe {
+            RunEndBuffer::<T>::new_unchecked(
+                raw_run_ends_buffer.clone().into(),
+                run_ends_data.offset() + data.offset() + start,
+                len,
+            )
+        };
 
-    if lhs.len() != rhs.len() {
-        return false;
+        let values = &data.child_data()[1];
+        Self { run_ends, values }
     }
 
-    let lhs_child_data = lhs.child_data();
-    let lhs_run_ends_array = &lhs_child_data[0];
-    let lhs_values_array = &lhs_child_data[1];
+    fn run_end(&self, index: usize) -> usize {
+        self.run_ends.values()[index].as_usize()
+    }
 
-    let rhs_child_data = rhs.child_data();
-    let rhs_run_ends_array = &rhs_child_data[0];
-    let rhs_values_array = &rhs_child_data[1];
+    fn get_start_end_physical_indices(&self) -> (usize, usize) {
+        let start = self.run_ends.get_start_physical_index();
+        let end = self.run_ends.get_end_physical_index();
+        (start, end)
+    }
+}
 
-    if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
-        return false;
+fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
+    lhs: &ArrayData,
+    rhs: &ArrayData,
+    lhs_start: usize,
+    rhs_start: usize,
+    len: usize,
+) -> bool {
+    if len == 0 {
+        return true;
     }
 
-    if lhs_values_array.len() != rhs_values_array.len() {
-        return false;
+    let l_array = RunArrayData::<T>::new(lhs, lhs_start, len);
+    let r_array = RunArrayData::<T>::new(rhs, rhs_start, len);
+
+    let (l_start_phys, l_end_phys) = l_array.get_start_end_physical_indices();
+    let (r_start_phys, r_end_phys) = r_array.get_start_end_physical_indices();
+    let l_runs = l_end_phys - l_start_phys + 1;
+    let r_runs = r_end_phys - r_start_phys + 1;
+
+    if l_runs == r_runs {
+        // When the boundaries align perfectly, we don't need the complex 
stepping loop that calculates overlaps.
+        // Instead, we can simply treat the underlying values arrays as if 
they were standard primitive arrays.
+        let l_iter = l_array.run_ends.sliced_values();
+        let r_iter = r_array.run_ends.sliced_values();
+        let physical_match = l_iter.zip(r_iter).all(|(l_re, r_re)| l_re == 
r_re);
+
+        if physical_match {
+            // Both arrays are partitioned identically.
+            // We can just verify if the physical values in those partitions 
match.
+            return equal_range(
+                l_array.values,
+                r_array.values,
+                l_start_phys,
+                r_start_phys,
+                l_runs,
+            );
+        }
     }
 
-    // check run ends array are equal. The length of the physical array
-    // is used to validate the child arrays.
-    let run_ends_equal = equal_range(
-        lhs_run_ends_array,
-        rhs_run_ends_array,
-        lhs_start,
-        rhs_start,
-        lhs_run_ends_array.len(),
-    );
-
-    // if run ends array are not the same return early without validating
-    // values array.
-    if !run_ends_equal {
-        return false;
+    let mut l_phys = l_start_phys;
+    let mut r_phys = r_start_phys;
+    let mut processed = 0;
+    while processed < len {
+        if !equal_range(l_array.values, r_array.values, l_phys, r_phys, 1) {
+            return false;
+        }
+
+        let l_run_end = l_array.run_end(l_phys);
+        let r_run_end = r_array.run_end(r_phys);
+
+        //Calculate how many more logical elements are in the current run of 
the left and right array
+        let l_remaining_in_run = l_run_end - (l_array.run_ends.offset() + 
processed);
+        let r_remaining_in_run = r_run_end - (r_array.run_ends.offset() + 
processed);
+
+        //Calculate how many elements are left to compare in the requested 
range
+        let remaining_in_range = len - processed;
+
+        //Find the smallest of these three to determine our step size
+        //The goal is to move the logical cursor (processed) forward as far as 
possible without:
+        //Crossing the boundary of a run in the left or right array (where the 
value might change).
+        //Going past the total length we were asked to compare.
+        let step = l_remaining_in_run
+            .min(r_remaining_in_run)
+            .min(remaining_in_range);
+        processed += step;
+
+        if processed < len {

Review Comment:
   Can probably omit this if check? Because if `processed >= len` then next 
iteration will exit anyway



##########
arrow-data/src/equal/run.rs:
##########
@@ -16,71 +16,149 @@
 // under the License.
 
 use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_buffer::RunEndBuffer;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
 
 use super::equal_range;
 
-/// The current implementation of comparison of run array support physical 
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`, 
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index 
cannot be done
-/// in constant time. The current comparison compares the underlying physical 
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
 pub(super) fn run_equal(
     lhs: &ArrayData,
     rhs: &ArrayData,
     lhs_start: usize,
     rhs_start: usize,
     len: usize,
 ) -> bool {
-    if lhs_start != 0
-        || rhs_start != 0
-        || (lhs.len() != len && rhs.len() != len)
-        || lhs.offset() > 0
-        || rhs.offset() > 0
-    {
-        unimplemented!("Logical comparison for run array not supported.")
+    let lhs_index_type = match lhs.data_type() {
+        DataType::RunEndEncoded(f, _) => f.data_type(),
+        _ => unreachable!(),
+    };
+
+    match lhs_index_type {
+        DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start, 
rhs_start, len),
+        DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start, 
rhs_start, len),
+        DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start, 
rhs_start, len),
+        _ => unreachable!(),
     }
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+    run_ends: RunEndBuffer<T>,
+    values: &'a ArrayData,
+}
+
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+    fn new(data: &'a ArrayData, start: usize, len: usize) -> Self {
+        debug_assert!(
+            data.child_data().len() == 2,
+            "RunEndEncoded arrays are guaranteed to have 2 children [run_ends, 
values]"
+        );
+        let run_ends_data = &data.child_data()[0];
+        let raw_run_ends_buffer = &run_ends_data.buffers()[0];
+        // SAFETY: we're reconstructing RunEndBuffer from a known valid 
RunArray
+        let run_ends = unsafe {
+            RunEndBuffer::<T>::new_unchecked(
+                raw_run_ends_buffer.clone().into(),
+                run_ends_data.offset() + data.offset() + start,
+                len,
+            )
+        };
 
-    if lhs.len() != rhs.len() {
-        return false;
+        let values = &data.child_data()[1];
+        Self { run_ends, values }
     }
 
-    let lhs_child_data = lhs.child_data();
-    let lhs_run_ends_array = &lhs_child_data[0];
-    let lhs_values_array = &lhs_child_data[1];
+    fn run_end(&self, index: usize) -> usize {
+        self.run_ends.values()[index].as_usize()
+    }
 
-    let rhs_child_data = rhs.child_data();
-    let rhs_run_ends_array = &rhs_child_data[0];
-    let rhs_values_array = &rhs_child_data[1];
+    fn get_start_end_physical_indices(&self) -> (usize, usize) {
+        let start = self.run_ends.get_start_physical_index();
+        let end = self.run_ends.get_end_physical_index();
+        (start, end)
+    }
+}
 
-    if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
-        return false;
+fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
+    lhs: &ArrayData,
+    rhs: &ArrayData,
+    lhs_start: usize,
+    rhs_start: usize,
+    len: usize,
+) -> bool {
+    if len == 0 {
+        return true;
     }
 
-    if lhs_values_array.len() != rhs_values_array.len() {
-        return false;
+    let l_array = RunArrayData::<T>::new(lhs, lhs_start, len);
+    let r_array = RunArrayData::<T>::new(rhs, rhs_start, len);
+
+    let (l_start_phys, l_end_phys) = l_array.get_start_end_physical_indices();
+    let (r_start_phys, r_end_phys) = r_array.get_start_end_physical_indices();
+    let l_runs = l_end_phys - l_start_phys + 1;
+    let r_runs = r_end_phys - r_start_phys + 1;
+
+    if l_runs == r_runs {
+        // When the boundaries align perfectly, we don't need the complex 
stepping loop that calculates overlaps.

Review Comment:
   Thanks for these comments, they really helped my understanding of this code 
👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to