manishkr commented on code in PR #9213:
URL: https://github.com/apache/arrow-rs/pull/9213#discussion_r2725348529
##########
arrow-data/src/equal/run.rs:
##########
@@ -16,71 +16,148 @@
// under the License.
use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
use super::equal_range;
-/// The current implementation of comparison of run array support physical
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`,
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index
cannot be done
-/// in constant time. The current comparison compares the underlying physical
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
- if lhs_start != 0
- || rhs_start != 0
- || (lhs.len() != len && rhs.len() != len)
- || lhs.offset() > 0
- || rhs.offset() > 0
- {
- unimplemented!("Logical comparison for run array not supported.")
+ let lhs_index_type = match lhs.data_type() {
+ DataType::RunEndEncoded(f, _) => f.data_type(),
+ _ => unreachable!(),
+ };
+
+ match lhs_index_type {
+ DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start,
rhs_start, len),
+ _ => unreachable!(),
}
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+ run_ends: &'a [T],
+ values: &'a ArrayData,
+ abs_start: usize,
+}
- if lhs.len() != rhs.len() {
- return false;
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+ fn new(data: &'a ArrayData, start: usize) -> Self {
+ debug_assert!(
+ data.child_data().len() >= 2,
+ "RunEndEncoded arrays are guaranteed to have at least 2 children
[run_ends, values]"
+ );
+ let run_ends_data = &data.child_data()[0];
+ let run_ends = &run_ends_data.buffers()[0].typed_data::<T>()
+ [run_ends_data.offset()..run_ends_data.offset() +
run_ends_data.len()];
+ let values = &data.child_data()[1];
+ Self {
+ run_ends,
+ values,
+ abs_start: data.offset() + start,
+ }
}
- let lhs_child_data = lhs.child_data();
- let lhs_run_ends_array = &lhs_child_data[0];
- let lhs_values_array = &lhs_child_data[1];
+ fn run_end(&self, index: usize) -> usize {
+ self.run_ends[index].as_usize()
+ }
- let rhs_child_data = rhs.child_data();
- let rhs_run_ends_array = &rhs_child_data[0];
- let rhs_values_array = &rhs_child_data[1];
+ fn get_physical_index(&self, logical_index: usize) -> usize {
+ let logical_index = logical_index + self.abs_start;
+ if logical_index == 0 {
+ return 0;
+ }
+ match self
+ .run_ends
+ .binary_search_by(|val| val.as_usize().cmp(&logical_index))
+ {
+ Ok(idx) => idx + 1,
+ Err(idx) => idx,
+ }
+ }
- if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
- return false;
+ fn get_start_end_physical_indices(&self, len: usize) -> (usize, usize) {
+ let start = self.get_physical_index(0);
+ let end = self.get_physical_index(len - 1);
+ (start, end)
}
+}
- if lhs_values_array.len() != rhs_values_array.len() {
- return false;
+fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
+ lhs: &ArrayData,
+ rhs: &ArrayData,
+ lhs_start: usize,
+ rhs_start: usize,
+ len: usize,
+) -> bool {
+ if len == 0 {
+ return true;
+ }
+
+ let l_array = RunArrayData::<T>::new(lhs, lhs_start);
+ let r_array = RunArrayData::<T>::new(rhs, rhs_start);
+
+ let (l_start_phys, r_start_phys, l_runs, r_runs) = {
+ let (l_start, l_end) = l_array.get_start_end_physical_indices(len);
+ let (r_start, r_end) = r_array.get_start_end_physical_indices(len);
+ (l_start, r_start, l_end - l_start + 1, r_end - r_start + 1)
+ };
+
+ if l_runs == r_runs {
+ let physical_match = l_array.run_ends[l_start_phys..l_start_phys +
l_runs - 1]
+ .iter()
+ .zip(&r_array.run_ends[r_start_phys..r_start_phys + r_runs - 1])
+ .all(|(l_re, r_re)| {
+ l_re.as_usize() - l_array.abs_start == r_re.as_usize() -
r_array.abs_start
+ });
+
+ if physical_match {
+ return equal_range(
+ l_array.values,
+ r_array.values,
+ l_start_phys,
+ r_start_phys,
+ l_runs,
+ );
+ }
}
- // check run ends array are equal. The length of the physical array
- // is used to validate the child arrays.
- let run_ends_equal = equal_range(
- lhs_run_ends_array,
- rhs_run_ends_array,
- lhs_start,
- rhs_start,
- lhs_run_ends_array.len(),
- );
-
- // if run ends array are not the same return early without validating
- // values array.
- if !run_ends_equal {
- return false;
+ let mut l_phys = l_start_phys;
+ let mut r_phys = r_start_phys;
+ let mut processed = 0;
+ while processed < len {
+ if !equal_range(l_array.values, r_array.values, l_phys, r_phys, 1) {
+ return false;
+ }
+
+ let l_run_end = l_array.run_end(l_phys);
+ let r_run_end = r_array.run_end(r_phys);
+
+ let step = (l_run_end - (l_array.abs_start + processed))
+ .min(r_run_end - (r_array.abs_start + processed))
+ .min(len - processed);
+ processed += step;
+
+ if processed < len {
+ if l_array.abs_start + processed == l_run_end {
+ l_phys += 1;
+ }
+ if r_array.abs_start + processed == r_run_end {
+ r_phys += 1;
+ }
+ }
Review Comment:
Added comment to explain it. Basically it's finding minimum chunk of
sub-array that can be compared in equal_range, do you have any other idea in
mind? Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]