Jefffrey commented on code in PR #9213:
URL: https://github.com/apache/arrow-rs/pull/9213#discussion_r2724224031
##########
arrow-data/src/equal/run.rs:
##########
@@ -16,71 +16,148 @@
// under the License.
use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
use super::equal_range;
-/// The current implementation of comparison of run array support physical
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`,
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index
cannot be done
-/// in constant time. The current comparison compares the underlying physical
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
- if lhs_start != 0
- || rhs_start != 0
- || (lhs.len() != len && rhs.len() != len)
- || lhs.offset() > 0
- || rhs.offset() > 0
- {
- unimplemented!("Logical comparison for run array not supported.")
+ let lhs_index_type = match lhs.data_type() {
+ DataType::RunEndEncoded(f, _) => f.data_type(),
+ _ => unreachable!(),
+ };
+
+ match lhs_index_type {
+ DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start,
rhs_start, len),
+ _ => unreachable!(),
}
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+ run_ends: &'a [T],
+ values: &'a ArrayData,
+ abs_start: usize,
+}
- if lhs.len() != rhs.len() {
- return false;
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+ fn new(data: &'a ArrayData, start: usize) -> Self {
+ debug_assert!(
+ data.child_data().len() >= 2,
+ "RunEndEncoded arrays are guaranteed to have at least 2 children
[run_ends, values]"
Review Comment:
```suggestion
data.child_data().len() == 2,
"RunEndEncoded arrays are guaranteed to have 2 children
[run_ends, values]"
```
##########
arrow-data/src/equal/run.rs:
##########
@@ -16,71 +16,148 @@
// under the License.
use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
use super::equal_range;
-/// The current implementation of comparison of run array support physical
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`,
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index
cannot be done
-/// in constant time. The current comparison compares the underlying physical
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
- if lhs_start != 0
- || rhs_start != 0
- || (lhs.len() != len && rhs.len() != len)
- || lhs.offset() > 0
- || rhs.offset() > 0
- {
- unimplemented!("Logical comparison for run array not supported.")
+ let lhs_index_type = match lhs.data_type() {
+ DataType::RunEndEncoded(f, _) => f.data_type(),
+ _ => unreachable!(),
+ };
+
+ match lhs_index_type {
+ DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start,
rhs_start, len),
+ _ => unreachable!(),
}
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+ run_ends: &'a [T],
+ values: &'a ArrayData,
+ abs_start: usize,
+}
- if lhs.len() != rhs.len() {
- return false;
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+ fn new(data: &'a ArrayData, start: usize) -> Self {
+ debug_assert!(
+ data.child_data().len() >= 2,
+ "RunEndEncoded arrays are guaranteed to have at least 2 children
[run_ends, values]"
+ );
+ let run_ends_data = &data.child_data()[0];
+ let run_ends = &run_ends_data.buffers()[0].typed_data::<T>()
Review Comment:
I do wonder if we're better off doing something like this:
```rust
let raw_run_ends_buffer = &run_ends_data.buffers()[0];
// SAFETY: we're reconstructing RunEndBuffer from a known valid
RunArray
let run_ends = unsafe {
RunEndBuffer::<T>::new_unchecked(
raw_run_ends_buffer.clone().into(),
run_ends_data.offset(),
run_ends_data.len(),
)
};
```
This way we can reuse the code to search for physical indices from
`RunEndBuffer`, at the cost of a clone of `Buffer` (which is just an `Arc` and
usize + ptr) 🤔
##########
arrow-data/src/equal/run.rs:
##########
@@ -16,71 +16,148 @@
// under the License.
use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
use super::equal_range;
-/// The current implementation of comparison of run array support physical
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`,
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index
cannot be done
-/// in constant time. The current comparison compares the underlying physical
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
- if lhs_start != 0
- || rhs_start != 0
- || (lhs.len() != len && rhs.len() != len)
- || lhs.offset() > 0
- || rhs.offset() > 0
- {
- unimplemented!("Logical comparison for run array not supported.")
+ let lhs_index_type = match lhs.data_type() {
+ DataType::RunEndEncoded(f, _) => f.data_type(),
+ _ => unreachable!(),
+ };
+
+ match lhs_index_type {
+ DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start,
rhs_start, len),
+ _ => unreachable!(),
}
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+ run_ends: &'a [T],
+ values: &'a ArrayData,
+ abs_start: usize,
+}
- if lhs.len() != rhs.len() {
- return false;
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+ fn new(data: &'a ArrayData, start: usize) -> Self {
+ debug_assert!(
+ data.child_data().len() >= 2,
+ "RunEndEncoded arrays are guaranteed to have at least 2 children
[run_ends, values]"
+ );
+ let run_ends_data = &data.child_data()[0];
+ let run_ends = &run_ends_data.buffers()[0].typed_data::<T>()
+ [run_ends_data.offset()..run_ends_data.offset() +
run_ends_data.len()];
+ let values = &data.child_data()[1];
+ Self {
+ run_ends,
+ values,
+ abs_start: data.offset() + start,
+ }
}
- let lhs_child_data = lhs.child_data();
- let lhs_run_ends_array = &lhs_child_data[0];
- let lhs_values_array = &lhs_child_data[1];
+ fn run_end(&self, index: usize) -> usize {
+ self.run_ends[index].as_usize()
+ }
- let rhs_child_data = rhs.child_data();
- let rhs_run_ends_array = &rhs_child_data[0];
- let rhs_values_array = &rhs_child_data[1];
+ fn get_physical_index(&self, logical_index: usize) -> usize {
+ let logical_index = logical_index + self.abs_start;
+ if logical_index == 0 {
+ return 0;
+ }
+ match self
+ .run_ends
+ .binary_search_by(|val| val.as_usize().cmp(&logical_index))
+ {
+ Ok(idx) => idx + 1,
+ Err(idx) => idx,
+ }
+ }
- if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
- return false;
+ fn get_start_end_physical_indices(&self, len: usize) -> (usize, usize) {
+ let start = self.get_physical_index(0);
+ let end = self.get_physical_index(len - 1);
+ (start, end)
}
+}
- if lhs_values_array.len() != rhs_values_array.len() {
- return false;
+fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
+ lhs: &ArrayData,
+ rhs: &ArrayData,
+ lhs_start: usize,
+ rhs_start: usize,
+ len: usize,
+) -> bool {
+ if len == 0 {
+ return true;
+ }
+
+ let l_array = RunArrayData::<T>::new(lhs, lhs_start);
+ let r_array = RunArrayData::<T>::new(rhs, rhs_start);
+
+ let (l_start_phys, r_start_phys, l_runs, r_runs) = {
+ let (l_start, l_end) = l_array.get_start_end_physical_indices(len);
+ let (r_start, r_end) = r_array.get_start_end_physical_indices(len);
+ (l_start, r_start, l_end - l_start + 1, r_end - r_start + 1)
+ };
+
+ if l_runs == r_runs {
+ let physical_match = l_array.run_ends[l_start_phys..l_start_phys +
l_runs - 1]
+ .iter()
+ .zip(&r_array.run_ends[r_start_phys..r_start_phys + r_runs - 1])
+ .all(|(l_re, r_re)| {
+ l_re.as_usize() - l_array.abs_start == r_re.as_usize() -
r_array.abs_start
+ });
+
+ if physical_match {
+ return equal_range(
+ l_array.values,
+ r_array.values,
+ l_start_phys,
+ r_start_phys,
+ l_runs,
+ );
+ }
}
- // check run ends array are equal. The length of the physical array
- // is used to validate the child arrays.
- let run_ends_equal = equal_range(
- lhs_run_ends_array,
- rhs_run_ends_array,
- lhs_start,
- rhs_start,
- lhs_run_ends_array.len(),
- );
-
- // if run ends array are not the same return early without validating
- // values array.
- if !run_ends_equal {
- return false;
+ let mut l_phys = l_start_phys;
+ let mut r_phys = r_start_phys;
+ let mut processed = 0;
+ while processed < len {
+ if !equal_range(l_array.values, r_array.values, l_phys, r_phys, 1) {
+ return false;
+ }
+
+ let l_run_end = l_array.run_end(l_phys);
+ let r_run_end = r_array.run_end(r_phys);
+
+ let step = (l_run_end - (l_array.abs_start + processed))
+ .min(r_run_end - (r_array.abs_start + processed))
+ .min(len - processed);
+ processed += step;
+
+ if processed < len {
+ if l_array.abs_start + processed == l_run_end {
+ l_phys += 1;
+ }
+ if r_array.abs_start + processed == r_run_end {
+ r_phys += 1;
+ }
+ }
Review Comment:
Can we explain these steps? They don't seem clearly obvious to me,
especially the multiple `min`
##########
arrow-data/src/equal/run.rs:
##########
@@ -16,71 +16,148 @@
// under the License.
use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
use super::equal_range;
-/// The current implementation of comparison of run array support physical
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`,
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index
cannot be done
-/// in constant time. The current comparison compares the underlying physical
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
- if lhs_start != 0
- || rhs_start != 0
- || (lhs.len() != len && rhs.len() != len)
- || lhs.offset() > 0
- || rhs.offset() > 0
- {
- unimplemented!("Logical comparison for run array not supported.")
+ let lhs_index_type = match lhs.data_type() {
+ DataType::RunEndEncoded(f, _) => f.data_type(),
+ _ => unreachable!(),
+ };
+
+ match lhs_index_type {
+ DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start,
rhs_start, len),
+ _ => unreachable!(),
}
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+ run_ends: &'a [T],
+ values: &'a ArrayData,
+ abs_start: usize,
+}
- if lhs.len() != rhs.len() {
- return false;
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+ fn new(data: &'a ArrayData, start: usize) -> Self {
+ debug_assert!(
+ data.child_data().len() >= 2,
+ "RunEndEncoded arrays are guaranteed to have at least 2 children
[run_ends, values]"
+ );
+ let run_ends_data = &data.child_data()[0];
+ let run_ends = &run_ends_data.buffers()[0].typed_data::<T>()
+ [run_ends_data.offset()..run_ends_data.offset() +
run_ends_data.len()];
+ let values = &data.child_data()[1];
+ Self {
+ run_ends,
+ values,
+ abs_start: data.offset() + start,
+ }
}
- let lhs_child_data = lhs.child_data();
- let lhs_run_ends_array = &lhs_child_data[0];
- let lhs_values_array = &lhs_child_data[1];
+ fn run_end(&self, index: usize) -> usize {
+ self.run_ends[index].as_usize()
+ }
- let rhs_child_data = rhs.child_data();
- let rhs_run_ends_array = &rhs_child_data[0];
- let rhs_values_array = &rhs_child_data[1];
+ fn get_physical_index(&self, logical_index: usize) -> usize {
+ let logical_index = logical_index + self.abs_start;
+ if logical_index == 0 {
+ return 0;
+ }
+ match self
+ .run_ends
+ .binary_search_by(|val| val.as_usize().cmp(&logical_index))
+ {
+ Ok(idx) => idx + 1,
+ Err(idx) => idx,
+ }
+ }
- if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
- return false;
+ fn get_start_end_physical_indices(&self, len: usize) -> (usize, usize) {
+ let start = self.get_physical_index(0);
+ let end = self.get_physical_index(len - 1);
+ (start, end)
}
+}
- if lhs_values_array.len() != rhs_values_array.len() {
- return false;
+fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
+ lhs: &ArrayData,
+ rhs: &ArrayData,
+ lhs_start: usize,
+ rhs_start: usize,
+ len: usize,
+) -> bool {
+ if len == 0 {
+ return true;
+ }
+
+ let l_array = RunArrayData::<T>::new(lhs, lhs_start);
+ let r_array = RunArrayData::<T>::new(rhs, rhs_start);
+
+ let (l_start_phys, r_start_phys, l_runs, r_runs) = {
+ let (l_start, l_end) = l_array.get_start_end_physical_indices(len);
+ let (r_start, r_end) = r_array.get_start_end_physical_indices(len);
+ (l_start, r_start, l_end - l_start + 1, r_end - r_start + 1)
+ };
+
+ if l_runs == r_runs {
+ let physical_match = l_array.run_ends[l_start_phys..l_start_phys +
l_runs - 1]
+ .iter()
+ .zip(&r_array.run_ends[r_start_phys..r_start_phys + r_runs - 1])
+ .all(|(l_re, r_re)| {
+ l_re.as_usize() - l_array.abs_start == r_re.as_usize() -
r_array.abs_start
+ });
Review Comment:
```suggestion
let l_iter = l_array.run_ends.iter().skip(l_start_phys).take(l_runs);
let r_iter = r_array.run_ends.iter().skip(r_start_phys).take(r_runs);
let physical_match = l_iter.zip(r_iter).all(|(l_re, r_re)| {
l_re.as_usize() - l_array.abs_start == r_re.as_usize() -
r_array.abs_start
});
```
It was a bit confusing to see `l_runs` calculated as `l_end - l_start + 1`
but then here we essentially rebuild `l_ends` because we left it behind in the
scope above. More clean this way using iterator API.
Also could we leave an explanation of what this check means?
`physical_match` doesn't exactly describe what check is being done here 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]